@20602303. Default file permission is now 755.

git-svn-id: https://leveldb.googlecode.com/svn/trunk@20 62dab493-f737-651d-591e-8d6aee1b9529
main
dgrogan@chromium.org 14 years ago
parent 9e33808a26
commit f779e7a5d8
  1. 0
      AUTHORS
  2. 0
      LICENSE
  3. 0
      Makefile
  4. 0
      README
  5. 8
      TODO
  6. 0
      db/builder.cc
  7. 0
      db/builder.h
  8. 21
      db/corruption_test.cc
  9. 160
      db/db_bench.cc
  10. 270
      db/db_impl.cc
  11. 33
      db/db_impl.h
  12. 0
      db/db_iter.cc
  13. 0
      db/db_iter.h
  14. 27
      db/db_test.cc
  15. 0
      db/dbformat.cc
  16. 6
      db/dbformat.h
  17. 0
      db/dbformat_test.cc
  18. 0
      db/filename.cc
  19. 0
      db/filename.h
  20. 0
      db/filename_test.cc
  21. 0
      db/log_format.h
  22. 0
      db/log_reader.cc
  23. 0
      db/log_reader.h
  24. 0
      db/log_test.cc
  25. 0
      db/log_writer.cc
  26. 0
      db/log_writer.h
  27. 0
      db/memtable.cc
  28. 0
      db/memtable.h
  29. 0
      db/repair.cc
  30. 0
      db/skiplist.h
  31. 0
      db/skiplist_test.cc
  32. 0
      db/snapshot.h
  33. 0
      db/table_cache.cc
  34. 0
      db/table_cache.h
  35. 19
      db/version_edit.cc
  36. 6
      db/version_edit.h
  37. 0
      db/version_edit_test.cc
  38. 108
      db/version_set.cc
  39. 35
      db/version_set.h
  40. 0
      db/write_batch.cc
  41. 0
      db/write_batch_internal.h
  42. 0
      db/write_batch_test.cc
  43. 0
      doc/doc.css
  44. 0
      doc/impl.html
  45. 82
      doc/index.html
  46. 0
      doc/log_format.txt
  47. 0
      doc/table_format.txt
  48. 0
      include/leveldb/cache.h
  49. 0
      include/leveldb/comparator.h
  50. 12
      include/leveldb/db.h
  51. 0
      include/leveldb/env.h
  52. 0
      include/leveldb/iterator.h
  53. 27
      include/leveldb/options.h
  54. 0
      include/leveldb/slice.h
  55. 0
      include/leveldb/status.h
  56. 0
      include/leveldb/table.h
  57. 0
      include/leveldb/table_builder.h
  58. 0
      include/leveldb/write_batch.h
  59. 0
      leveldb.gyp
  60. 0
      port/README
  61. 0
      port/port.h
  62. 1
      port/port_android.cc
  63. 50
      port/port_android.h
  64. 0
      port/port_chromium.cc
  65. 0
      port/port_chromium.h
  66. 0
      port/port_example.h
  67. 0
      port/port_posix.cc
  68. 0
      port/port_posix.h
  69. 0
      port/sha1_portable.cc
  70. 0
      port/sha1_portable.h
  71. 0
      port/sha1_test.cc
  72. 0
      port/win/stdint.h
  73. 0
      table/block.cc
  74. 0
      table/block.h
  75. 0
      table/block_builder.cc
  76. 0
      table/block_builder.h
  77. 0
      table/format.cc
  78. 0
      table/format.h
  79. 0
      table/iterator.cc
  80. 0
      table/iterator_wrapper.h
  81. 0
      table/merger.cc
  82. 0
      table/merger.h
  83. 0
      table/table.cc
  84. 0
      table/table_builder.cc
  85. 4
      table/table_test.cc
  86. 0
      table/two_level_iterator.cc
  87. 0
      table/two_level_iterator.h
  88. 0
      util/arena.cc
  89. 0
      util/arena.h
  90. 0
      util/arena_test.cc
  91. 0
      util/cache.cc
  92. 0
      util/cache_test.cc
  93. 0
      util/coding.cc
  94. 0
      util/coding.h
  95. 0
      util/coding_test.cc
  96. 0
      util/comparator.cc
  97. 0
      util/crc32c.cc
  98. 0
      util/crc32c.h
  99. 0
      util/crc32c_test.cc
  100. 0
      util/env.cc
  101. Some files were not shown because too many files have changed in this diff Show More

@ -1,11 +1,3 @@
Before adding to chrome
-----------------------
- multi-threaded test/benchmark
- Allow missing crc32c in Table format?
Maybe afterwards
----------------
ss ss
- Stats - Stats

@ -8,6 +8,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include "leveldb/cache.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
@ -28,10 +29,12 @@ class CorruptionTest {
test::ErrorEnv env_; test::ErrorEnv env_;
Random rnd_; Random rnd_;
std::string dbname_; std::string dbname_;
Cache* tiny_cache_;
Options options_; Options options_;
DB* db_; DB* db_;
CorruptionTest() : rnd_(test::RandomSeed()) { CorruptionTest() : rnd_(test::RandomSeed()) {
tiny_cache_ = NewLRUCache(100);
options_.env = &env_; options_.env = &env_;
dbname_ = test::TmpDir() + "/db_test"; dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);
@ -45,6 +48,7 @@ class CorruptionTest {
~CorruptionTest() { ~CorruptionTest() {
delete db_; delete db_;
DestroyDB(dbname_, Options()); DestroyDB(dbname_, Options());
delete tiny_cache_;
} }
Status TryReopen(Options* options = NULL) { Status TryReopen(Options* options = NULL) {
@ -52,6 +56,7 @@ class CorruptionTest {
db_ = NULL; db_ = NULL;
Options opt = (options ? *options : options_); Options opt = (options ? *options : options_);
opt.env = &env_; opt.env = &env_;
opt.block_cache = tiny_cache_;
return DB::Open(opt, dbname_, &db_); return DB::Open(opt, dbname_, &db_);
} }
@ -160,12 +165,15 @@ class CorruptionTest {
ASSERT_TRUE(s.ok()) << s.ToString(); ASSERT_TRUE(s.ok()) << s.ToString();
} }
uint64_t Property(const std::string& name) { int Property(const std::string& name) {
uint64_t result; std::string property;
if (!db_->GetProperty(name, &result)) { int result;
result = ~static_cast<uint64_t>(0); if (db_->GetProperty(name, &property) &&
} sscanf(property.c_str(), "%d", &result) == 1) {
return result; return result;
} else {
return -1;
}
} }
// Return the ith key // Return the ith key
@ -235,7 +243,7 @@ TEST(CorruptionTest, TableFileIndexData) {
dbi->TEST_CompactRange(0, "", "~"); dbi->TEST_CompactRange(0, "", "~");
dbi->TEST_CompactRange(1, "", "~"); dbi->TEST_CompactRange(1, "", "~");
Corrupt(kTableFile, -1000, 500); Corrupt(kTableFile, -2000, 500);
Reopen(); Reopen();
Check(5000, 9999); Check(5000, 9999);
} }
@ -327,6 +335,7 @@ TEST(CorruptionTest, CompactionInputError) {
TEST(CorruptionTest, CompactionInputErrorParanoid) { TEST(CorruptionTest, CompactionInputErrorParanoid) {
Options options; Options options;
options.paranoid_checks = true; options.paranoid_checks = true;
options.write_buffer_size = 1048576;
Reopen(&options); Reopen(&options);
Build(10); Build(10);

@ -31,11 +31,8 @@
// sha1 -- repeated SHA1 computation over 4K of data // sha1 -- repeated SHA1 computation over 4K of data
// Meta operations: // Meta operations:
// compact -- Compact the entire DB // compact -- Compact the entire DB
// stats -- Print DB stats
// heapprofile -- Dump a heap profile (if supported by this port) // heapprofile -- Dump a heap profile (if supported by this port)
// sync -- switch to synchronous writes (not the default)
// nosync -- switch to asynchronous writes (the default)
// tenth -- divide N by 10 (i.e., following benchmarks are smaller)
// normal -- reset N back to its normal value (1000000)
static const char* FLAGS_benchmarks = static const char* FLAGS_benchmarks =
"fillseq," "fillseq,"
"fillsync," "fillsync,"
@ -51,7 +48,9 @@ static const char* FLAGS_benchmarks =
"readreverse," "readreverse,"
"fill100K," "fill100K,"
"crc32c," "crc32c,"
"sha1" "sha1,"
"snappycomp,"
"snappyuncomp,"
; ;
// Number of key/values to place in database // Number of key/values to place in database
@ -68,7 +67,12 @@ static double FLAGS_compression_ratio = 0.5;
static bool FLAGS_histogram = false; static bool FLAGS_histogram = false;
// Number of bytes to buffer in memtable before compacting // Number of bytes to buffer in memtable before compacting
static int FLAGS_write_buffer_size = 1 << 20; // (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0;
// Number of bytes to use as a cache of uncompressed data.
// Negative means use default settings.
static int FLAGS_cache_size = -1;
namespace leveldb { namespace leveldb {
@ -129,6 +133,7 @@ class Benchmark {
double last_op_finish_; double last_op_finish_;
int64_t bytes_; int64_t bytes_;
std::string message_; std::string message_;
std::string post_message_;
Histogram hist_; Histogram hist_;
RandomGenerator gen_; RandomGenerator gen_;
Random rand_; Random rand_;
@ -146,7 +151,8 @@ class Benchmark {
static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
fprintf(stdout, "Entries: %d\n", num_); fprintf(stdout, "Entries: %d\n", num_);
fprintf(stdout, "RawSize: %.1f MB (estimated)\n", fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
(((kKeySize + FLAGS_value_size) * num_) / 1048576.0)); ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
/ 1048576.0));
fprintf(stdout, "FileSize: %.1f MB (estimated)\n", fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
(((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
/ 1048576.0)); / 1048576.0));
@ -164,6 +170,15 @@ class Benchmark {
fprintf(stdout, fprintf(stdout,
"WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif #endif
// See if snappy is working by attempting to compress a compressible string
const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
std::string compressed;
if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
} else if (compressed.size() >= sizeof(text)) {
fprintf(stdout, "WARNING: Snappy compression is not effective\n");
}
} }
void PrintEnvironment() { void PrintEnvironment() {
@ -225,15 +240,13 @@ class Benchmark {
done_++; done_++;
if (done_ >= next_report_) { if (done_ >= next_report_) {
if (next_report_ < 1000) { if (next_report_ < 1000) next_report_ += 100;
next_report_ += 100; else if (next_report_ < 5000) next_report_ += 500;
} else if (next_report_ < 10000) { else if (next_report_ < 10000) next_report_ += 1000;
next_report_ += 1000; else if (next_report_ < 50000) next_report_ += 5000;
} else if (next_report_ < 100000) { else if (next_report_ < 100000) next_report_ += 10000;
next_report_ += 10000; else if (next_report_ < 500000) next_report_ += 50000;
} else { else next_report_ += 100000;
next_report_ += 100000;
}
fprintf(stderr, "... finished %d ops%30s\r", done_, ""); fprintf(stderr, "... finished %d ops%30s\r", done_, "");
fflush(stderr); fflush(stderr);
} }
@ -248,7 +261,7 @@ class Benchmark {
if (bytes_ > 0) { if (bytes_ > 0) {
char rate[100]; char rate[100];
snprintf(rate, sizeof(rate), "%5.1f MB/s", snprintf(rate, sizeof(rate), "%6.1f MB/s",
(bytes_ / 1048576.0) / (finish - start_)); (bytes_ / 1048576.0) / (finish - start_));
if (!message_.empty()) { if (!message_.empty()) {
message_ = std::string(rate) + " " + message_; message_ = std::string(rate) + " " + message_;
@ -266,6 +279,11 @@ class Benchmark {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
} }
fflush(stdout); fflush(stdout);
if (!post_message_.empty()) {
fprintf(stdout, "\n%s\n", post_message_.c_str());
post_message_.clear();
}
} }
public: public:
@ -278,7 +296,8 @@ class Benchmark {
EXISTING EXISTING
}; };
Benchmark() : cache_(NewLRUCache(200<<20)), Benchmark()
: cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
db_(NULL), db_(NULL),
num_(FLAGS_num), num_(FLAGS_num),
heap_counter_(0), heap_counter_(0),
@ -318,38 +337,56 @@ class Benchmark {
Start(); Start();
WriteOptions write_options; WriteOptions write_options;
write_options.sync = false; bool known = true;
if (name == Slice("fillseq")) { if (name == Slice("fillseq")) {
Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size); Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1);
} else if (name == Slice("fillbatch")) {
Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1000);
} else if (name == Slice("fillrandom")) { } else if (name == Slice("fillrandom")) {
Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size); Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size, 1);
} else if (name == Slice("overwrite")) { } else if (name == Slice("overwrite")) {
Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size); Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1);
} else if (name == Slice("fillsync")) { } else if (name == Slice("fillsync")) {
write_options.sync = true; write_options.sync = true;
Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size); Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size, 1);
} else if (name == Slice("fill100K")) { } else if (name == Slice("fill100K")) {
Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000); Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1);
} else if (name == Slice("readseq")) { } else if (name == Slice("readseq")) {
ReadSequential(); ReadSequential();
} else if (name == Slice("readreverse")) { } else if (name == Slice("readreverse")) {
ReadReverse(); ReadReverse();
} else if (name == Slice("readrandom")) { } else if (name == Slice("readrandom")) {
ReadRandom(); ReadRandom();
} else if (name == Slice("readrandomsmall")) {
int n = num_;
num_ /= 1000;
ReadRandom();
num_ = n;
} else if (name == Slice("compact")) { } else if (name == Slice("compact")) {
Compact(); Compact();
} else if (name == Slice("crc32c")) { } else if (name == Slice("crc32c")) {
Crc32c(4096, "(4K per op)"); Crc32c(4096, "(4K per op)");
} else if (name == Slice("sha1")) { } else if (name == Slice("sha1")) {
SHA1(4096, "(4K per op)"); SHA1(4096, "(4K per op)");
} else if (name == Slice("snappycomp")) {
SnappyCompress();
} else if (name == Slice("snappyuncomp")) {
SnappyUncompress();
} else if (name == Slice("heapprofile")) { } else if (name == Slice("heapprofile")) {
HeapProfile(); HeapProfile();
} else if (name == Slice("stats")) {
PrintStats();
} else { } else {
known = false;
if (name != Slice()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
} }
}
if (known) {
Stop(name); Stop(name);
} }
} }
}
private: private:
void Crc32c(int size, const char* label) { void Crc32c(int size, const char* label) {
@ -387,11 +424,54 @@ class Benchmark {
message_ = label; message_ = label;
} }
void SnappyCompress() {
Slice input = gen_.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
FinishedSingleOp();
}
if (!ok) {
message_ = "(snappy failure)";
} else {
char buf[100];
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
message_ = buf;
bytes_ = bytes;
}
}
void SnappyUncompress() {
Slice input = gen_.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
std::string uncompressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
&uncompressed);
bytes += uncompressed.size();
FinishedSingleOp();
}
if (!ok) {
message_ = "(snappy failure)";
} else {
bytes_ = bytes;
}
}
void Open() { void Open() {
assert(db_ == NULL); assert(db_ == NULL);
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.max_open_files = 10000;
options.block_cache = cache_; options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
Status s = DB::Open(options, "/tmp/dbbench", &db_); Status s = DB::Open(options, "/tmp/dbbench", &db_);
@ -402,7 +482,7 @@ class Benchmark {
} }
void Write(const WriteOptions& options, Order order, DBState state, void Write(const WriteOptions& options, Order order, DBState state,
int num_entries, int value_size) { int num_entries, int value_size, int entries_per_batch) {
if (state == FRESH) { if (state == FRESH) {
delete db_; delete db_;
db_ = NULL; db_ = NULL;
@ -420,19 +500,21 @@ class Benchmark {
WriteBatch batch; WriteBatch batch;
Status s; Status s;
std::string val; std::string val;
for (int i = 0; i < num_entries; i++) { for (int i = 0; i < num_entries; i += entries_per_batch) {
const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num); batch.Clear();
for (int j = 0; j < entries_per_batch; j++) {
const int k = (order == SEQUENTIAL) ? i+j : (rand_.Next() % FLAGS_num);
char key[100]; char key[100];
snprintf(key, sizeof(key), "%016d", k); snprintf(key, sizeof(key), "%016d", k);
batch.Clear();
batch.Put(key, gen_.Generate(value_size)); batch.Put(key, gen_.Generate(value_size));
s = db_->Write(options, &batch);
bytes_ += value_size + strlen(key); bytes_ += value_size + strlen(key);
FinishedSingleOp();
}
s = db_->Write(options, &batch);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str()); fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1); exit(1);
} }
FinishedSingleOp();
} }
} }
@ -475,10 +557,10 @@ class Benchmark {
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
int max_level_with_files = 1; int max_level_with_files = 1;
for (int level = 1; level < config::kNumLevels; level++) { for (int level = 1; level < config::kNumLevels; level++) {
uint64_t v; std::string property;
char name[100]; char name[100];
snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level); snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
if (db_->GetProperty(name, &v) && v > 0) { if (db_->GetProperty(name, &property) && atoi(property.c_str()) > 0) {
max_level_with_files = level; max_level_with_files = level;
} }
} }
@ -487,6 +569,15 @@ class Benchmark {
} }
} }
void PrintStats() {
std::string stats;
if (!db_->GetProperty("leveldb.stats", &stats)) {
message_ = "(failed)";
} else {
post_message_ = stats;
}
}
static void WriteToFile(void* arg, const char* buf, int n) { static void WriteToFile(void* arg, const char* buf, int n) {
reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n)); reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
} }
@ -512,6 +603,7 @@ class Benchmark {
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
double d; double d;
int n; int n;
@ -529,6 +621,8 @@ int main(int argc, char** argv) {
FLAGS_value_size = n; FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n; FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
FLAGS_cache_size = n;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1); exit(1);

@ -104,6 +104,9 @@ Options SanitizeOptions(const std::string& dbname,
result.info_log = new NullWritableFile; result.info_log = new NullWritableFile;
} }
} }
if (result.block_cache == NULL) {
result.block_cache = NewLRUCache(8 << 20);
}
return result; return result;
} }
@ -112,18 +115,20 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
internal_comparator_(options.comparator), internal_comparator_(options.comparator),
options_(SanitizeOptions(dbname, &internal_comparator_, options)), options_(SanitizeOptions(dbname, &internal_comparator_, options)),
owns_info_log_(options_.info_log != options.info_log), owns_info_log_(options_.info_log != options.info_log),
owns_cache_(options_.block_cache != options.block_cache),
dbname_(dbname), dbname_(dbname),
db_lock_(NULL), db_lock_(NULL),
shutting_down_(NULL), shutting_down_(NULL),
bg_cv_(&mutex_), bg_cv_(&mutex_),
compacting_cv_(&mutex_), compacting_cv_(&mutex_),
last_sequence_(0),
mem_(new MemTable(internal_comparator_)), mem_(new MemTable(internal_comparator_)),
imm_(NULL),
logfile_(NULL), logfile_(NULL),
log_(NULL), log_(NULL),
log_number_(0),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
compacting_(false) { compacting_(false) {
has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - 10; const int table_cache_size = options.max_open_files - 10;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size); table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
@ -149,6 +154,7 @@ DBImpl::~DBImpl() {
delete versions_; delete versions_;
delete mem_; delete mem_;
delete imm_;
delete log_; delete log_;
delete logfile_; delete logfile_;
delete table_cache_; delete table_cache_;
@ -156,15 +162,15 @@ DBImpl::~DBImpl() {
if (owns_info_log_) { if (owns_info_log_) {
delete options_.info_log; delete options_.info_log;
} }
if (owns_cache_) {
delete options_.block_cache;
}
} }
Status DBImpl::NewDB() { Status DBImpl::NewDB() {
assert(log_number_ == 0);
assert(last_sequence_ == 0);
VersionEdit new_db; VersionEdit new_db;
new_db.SetComparatorName(user_comparator()->Name()); new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(log_number_); new_db.SetLogNumber(0);
new_db.SetNextFile(2); new_db.SetNextFile(2);
new_db.SetLastSequence(0); new_db.SetLastSequence(0);
@ -193,15 +199,6 @@ Status DBImpl::NewDB() {
return s; return s;
} }
Status DBImpl::Install(VersionEdit* edit,
uint64_t new_log_number,
MemTable* cleanup_mem) {
mutex_.AssertHeld();
edit->SetLogNumber(new_log_number);
edit->SetLastSequence(last_sequence_);
return versions_->LogAndApply(edit, cleanup_mem);
}
void DBImpl::MaybeIgnoreError(Status* s) const { void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) { if (s->ok() || options_.paranoid_checks) {
// No change needed // No change needed
@ -216,7 +213,7 @@ void DBImpl::DeleteObsoleteFiles() {
std::set<uint64_t> live = pending_outputs_; std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live); versions_->AddLiveFiles(&live);
versions_->CleanupLargeValueRefs(live, log_number_); versions_->CleanupLargeValueRefs(live);
std::vector<std::string> filenames; std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
@ -228,7 +225,8 @@ void DBImpl::DeleteObsoleteFiles() {
bool keep = true; bool keep = true;
switch (type) { switch (type) {
case kLogFile: case kLogFile:
keep = (number == log_number_); keep = ((number == versions_->LogNumber()) ||
(number == versions_->PrevLogNumber()));
break; break;
case kDescriptorFile: case kDescriptorFile:
// Keep my manifest file, and any newer incarnations' // Keep my manifest file, and any newer incarnations'
@ -296,16 +294,20 @@ Status DBImpl::Recover(VersionEdit* edit) {
} }
} }
s = versions_->Recover(&log_number_, &last_sequence_); s = versions_->Recover();
if (s.ok()) { if (s.ok()) {
// Recover from the log file named in the descriptor // Recover from the log files named in the descriptor
SequenceNumber max_sequence(0); SequenceNumber max_sequence(0);
if (log_number_ != 0) { // log_number_ == 0 indicates initial empty state if (versions_->PrevLogNumber() != 0) { // log#==0 means no prev log
s = RecoverLogFile(log_number_, edit, &max_sequence); s = RecoverLogFile(versions_->PrevLogNumber(), edit, &max_sequence);
}
if (s.ok() && versions_->LogNumber() != 0) { // log#==0 for initial state
s = RecoverLogFile(versions_->LogNumber(), edit, &max_sequence);
} }
if (s.ok()) { if (s.ok()) {
last_sequence_ = if (versions_->LastSequence() < max_sequence) {
last_sequence_ > max_sequence ? last_sequence_ : max_sequence; versions_->SetLastSequence(max_sequence);
}
} }
} }
@ -407,56 +409,58 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld(); mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
meta.number = versions_->NewFileNumber(); meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number); pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
Log(env_, options_.info_log, "Level-0 table #%llu: started", Log(env_, options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number); (unsigned long long) meta.number);
Status s = BuildTable(dbname_, env_, options_, table_cache_,
iter, &meta, edit); Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, edit);
mutex_.Lock();
}
Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s", Log(env_, options_.info_log, "Level-0 table #%llu: %lld bytes %s",
(unsigned long long) meta.number, (unsigned long long) meta.number,
(unsigned long long) meta.file_size, (unsigned long long) meta.file_size,
s.ToString().c_str()); s.ToString().c_str());
delete iter; delete iter;
pending_outputs_.erase(meta.number); pending_outputs_.erase(meta.number);
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[0].Add(stats);
return s; return s;
} }
Status DBImpl::CompactMemTable() { Status DBImpl::CompactMemTable() {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(imm_ != NULL);
WritableFile* lfile = NULL; assert(compacting_);
uint64_t new_log_number = versions_->NewFileNumber();
VersionEdit edit;
// Save the contents of the memtable as a new Table // Save the contents of the memtable as a new Table
Status s = WriteLevel0Table(mem_, &edit); VersionEdit edit;
if (s.ok()) { Status s = WriteLevel0Table(imm_, &edit);
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
}
// Save a new descriptor with the new table and log number. // Replace immutable memtable with the generated Table
if (s.ok()) { if (s.ok()) {
s = Install(&edit, new_log_number, mem_); edit.SetPrevLogNumber(0);
s = versions_->LogAndApply(&edit, imm_);
} }
if (s.ok()) { if (s.ok()) {
// Commit to the new state // Commit to the new state
mem_ = new MemTable(internal_comparator_); imm_ = NULL;
delete log_; has_imm_.Release_Store(NULL);
delete logfile_;
logfile_ = lfile;
log_ = new log::Writer(lfile);
log_number_ = new_log_number;
DeleteObsoleteFiles(); DeleteObsoleteFiles();
MaybeScheduleCompaction();
} else {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
} }
compacting_cv_.SignalAll(); // Wake up waiter even if there was an error
return s; return s;
} }
@ -485,7 +489,17 @@ void DBImpl::TEST_CompactRange(
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::TEST_CompactMemTable() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
return CompactMemTable(); Status s = MakeRoomForWrite(true /* force compaction */);
if (s.ok()) {
// Wait until the compaction completes
while (imm_ != NULL && bg_error_.ok()) {
compacting_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
}
return s;
} }
void DBImpl::MaybeScheduleCompaction() { void DBImpl::MaybeScheduleCompaction() {
@ -496,7 +510,7 @@ void DBImpl::MaybeScheduleCompaction() {
// Some other thread is running a compaction. Do not conflict with it. // Some other thread is running a compaction. Do not conflict with it.
} else if (shutting_down_.Acquire_Load()) { } else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
} else if (!versions_->NeedsCompaction()) { } else if (imm_ == NULL && !versions_->NeedsCompaction()) {
// No work to be done // No work to be done
} else { } else {
bg_compaction_scheduled_ = true; bg_compaction_scheduled_ = true;
@ -525,6 +539,16 @@ void DBImpl::BackgroundCall() {
void DBImpl::BackgroundCompaction() { void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(!compacting_);
if (imm_ != NULL) {
compacting_ = true;
CompactMemTable();
compacting_ = false;
compacting_cv_.SignalAll();
return;
}
Compaction* c = versions_->PickCompaction(); Compaction* c = versions_->PickCompaction();
if (c == NULL) { if (c == NULL) {
// Nothing to do // Nothing to do
@ -539,7 +563,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number); c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest); f->smallest, f->largest);
status = Install(c->edit(), log_number_, NULL); status = versions_->LogAndApply(c->edit(), NULL);
Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n", Log(env_, options_.info_log, "Moved #%lld to level-%d %lld bytes %s\n",
static_cast<unsigned long long>(f->number), static_cast<unsigned long long>(f->number),
c->level() + 1, c->level() + 1,
@ -680,7 +704,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
} }
compact->outputs.clear(); compact->outputs.clear();
Status s = Install(compact->compaction->edit(), log_number_, NULL); Status s = versions_->LogAndApply(compact->compaction->edit(), NULL);
if (s.ok()) { if (s.ok()) {
compact->compaction->ReleaseInputs(); compact->compaction->ReleaseInputs();
DeleteObsoleteFiles(); DeleteObsoleteFiles();
@ -694,6 +718,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
} }
Status DBImpl::DoCompactionWork(CompactionState* compact) { Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files", Log(env_, options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->level(),
@ -704,7 +731,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
assert(compact->builder == NULL); assert(compact->builder == NULL);
assert(compact->outfile == NULL); assert(compact->outfile == NULL);
if (snapshots_.empty()) { if (snapshots_.empty()) {
compact->smallest_snapshot = last_sequence_; compact->smallest_snapshot = versions_->LastSequence();
} else { } else {
compact->smallest_snapshot = snapshots_.oldest()->number_; compact->smallest_snapshot = snapshots_.oldest()->number_;
} }
@ -721,6 +748,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
bool has_current_user_key = false; bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != NULL) {
CompactMemTable();
compacting_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
imm_micros += (env_->NowMicros() - imm_start);
}
Slice key = input->key(); Slice key = input->key();
InternalKey tmp_internal_key; InternalKey tmp_internal_key;
tmp_internal_key.DecodeFrom(key); tmp_internal_key.DecodeFrom(key);
@ -835,7 +874,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
delete input; delete input;
input = NULL; input = NULL;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros - imm_micros;
for (int which = 0; which < 2; which++) {
for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
stats.bytes_read += compact->compaction->input(which, i)->file_size;
}
}
for (int i = 0; i < compact->outputs.size(); i++) {
stats.bytes_written += compact->outputs[i].file_size;
}
mutex_.Lock(); mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact);
@ -848,11 +899,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) { SequenceNumber* latest_snapshot) {
mutex_.Lock(); mutex_.Lock();
*latest_snapshot = last_sequence_; *latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators // Collect together all needed child iterators
std::vector<Iterator*> list; std::vector<Iterator*> list;
list.push_back(mem_->NewIterator()); list.push_back(mem_->NewIterator());
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
}
versions_->current()->AddIterators(options, &list); versions_->current()->AddIterators(options, &list);
Iterator* internal_iter = Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size()); NewMergingIterator(&internal_comparator_, &list[0], list.size());
@ -912,7 +966,7 @@ void DBImpl::Unref(void* arg1, void* arg2) {
const Snapshot* DBImpl::GetSnapshot() { const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
return snapshots_.New(last_sequence_); return snapshots_.New(versions_->LastSequence());
} }
void DBImpl::ReleaseSnapshot(const Snapshot* s) { void DBImpl::ReleaseSnapshot(const Snapshot* s) {
@ -935,17 +989,16 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
WriteBatch* final = NULL; WriteBatch* final = NULL;
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
if (!bg_error_.ok()) { status = MakeRoomForWrite(false); // May temporarily release lock and wait
status = bg_error_;
} else if (mem_->ApproximateMemoryUsage() > options_.write_buffer_size) { uint64_t last_sequence = versions_->LastSequence();
status = CompactMemTable();
}
if (status.ok()) { if (status.ok()) {
status = HandleLargeValues(last_sequence_ + 1, updates, &final); status = HandleLargeValues(last_sequence + 1, updates, &final);
} }
if (status.ok()) { if (status.ok()) {
WriteBatchInternal::SetSequence(final, last_sequence_ + 1); WriteBatchInternal::SetSequence(final, last_sequence + 1);
last_sequence_ += WriteBatchInternal::Count(final); last_sequence += WriteBatchInternal::Count(final);
versions_->SetLastSequence(last_sequence);
// Add to log and apply to memtable // Add to log and apply to memtable
status = log_->AddRecord(WriteBatchInternal::Contents(final)); status = log_->AddRecord(WriteBatchInternal::Contents(final));
@ -959,7 +1012,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (options.post_write_snapshot != NULL) { if (options.post_write_snapshot != NULL) {
*options.post_write_snapshot = *options.post_write_snapshot =
status.ok() ? snapshots_.New(last_sequence_) : NULL; status.ok() ? snapshots_.New(last_sequence) : NULL;
} }
} }
if (final != updates) { if (final != updates) {
@ -969,6 +1022,54 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
return status; return status;
} }
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
compacting_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
break;
}
VersionEdit edit;
edit.SetPrevLogNumber(versions_->LogNumber());
edit.SetLogNumber(new_log_number);
s = versions_->LogAndApply(&edit, NULL);
if (!s.ok()) {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
bool DBImpl::HasLargeValues(const WriteBatch& batch) const { bool DBImpl::HasLargeValues(const WriteBatch& batch) const {
if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) { if (WriteBatchInternal::ByteSize(&batch) >= options_.large_value_threshold) {
for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) { for (WriteBatchInternal::Iterator it(batch); !it.Done(); it.Next()) {
@ -1033,9 +1134,10 @@ Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq,
MaybeCompressLargeValue( MaybeCompressLargeValue(
it.value(), &file_bytes, &scratch, &large_ref); it.value(), &file_bytes, &scratch, &large_ref);
InternalKey ikey(it.key(), seq, kTypeLargeValueRef); InternalKey ikey(it.key(), seq, kTypeLargeValueRef);
if (versions_->RegisterLargeValueRef(large_ref, log_number_,ikey)) { if (versions_->RegisterLargeValueRef(
large_ref, versions_->LogNumber(), ikey)) {
// TODO(opt): avoid holding the lock here (but be careful about // TODO(opt): avoid holding the lock here (but be careful about
// another thread doing a Write and changing log_number_ or // another thread doing a Write and switching logs or
// having us get a different "assigned_seq" value). // having us get a different "assigned_seq" value).
uint64_t tmp_number = versions_->NewFileNumber(); uint64_t tmp_number = versions_->NewFileNumber();
@ -1086,7 +1188,9 @@ Status DBImpl::HandleLargeValues(SequenceNumber assigned_seq,
return Status::OK(); return Status::OK();
} }
bool DBImpl::GetProperty(const Slice& property, uint64_t* value) { bool DBImpl::GetProperty(const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_); MutexLock l(&mutex_);
Slice in = property; Slice in = property;
Slice prefix("leveldb."); Slice prefix("leveldb.");
@ -1100,10 +1204,37 @@ bool DBImpl::GetProperty(const Slice& property, uint64_t* value) {
if (!ok || level < 0 || level >= config::kNumLevels) { if (!ok || level < 0 || level >= config::kNumLevels) {
return false; return false;
} else { } else {
*value = versions_->NumLevelFiles(level); char buf[100];
snprintf(buf, sizeof(buf), "%d", versions_->NumLevelFiles(level));
*value = buf;
return true; return true;
} }
} else if (in == "stats") {
char buf[200];
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
"--------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < config::kNumLevels; level++) {
int files = versions_->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
level,
files,
versions_->NumLevelBytes(level) / 1048576.0,
stats_[level].micros / 1e6,
stats_[level].bytes_read / 1048576.0,
stats_[level].bytes_written / 1048576.0);
value->append(buf);
}
} }
return true;
}
return false; return false;
} }
@ -1158,14 +1289,15 @@ Status DB::Open(const Options& options, const std::string& dbname,
VersionEdit edit; VersionEdit edit;
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) { if (s.ok()) {
impl->log_number_ = impl->versions_->NewFileNumber(); uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile; WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, impl->log_number_), s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile); &lfile);
if (s.ok()) { if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile; impl->logfile_ = lfile;
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
s = impl->Install(&edit, impl->log_number_, NULL); s = impl->versions_->LogAndApply(&edit, NULL);
} }
if (s.ok()) { if (s.ok()) {
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();

@ -36,7 +36,7 @@ class DBImpl : public DB {
virtual Iterator* NewIterator(const ReadOptions&); virtual Iterator* NewIterator(const ReadOptions&);
virtual const Snapshot* GetSnapshot(); virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot); virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, uint64_t* value); virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
@ -72,14 +72,6 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit. // be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit); Status Recover(VersionEdit* edit);
// Apply the specified updates and save the resulting descriptor to
// persistent storage. If cleanup_mem is non-NULL, arrange to
// delete it when all existing snapshots have gone away iff Install()
// returns OK.
Status Install(VersionEdit* edit,
uint64_t new_log_number,
MemTable* cleanup_mem);
void MaybeIgnoreError(Status* s) const; void MaybeIgnoreError(Status* s) const;
// Delete any unneeded files and stale in-memory entries. // Delete any unneeded files and stale in-memory entries.
@ -99,6 +91,7 @@ class DBImpl : public DB {
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit); Status WriteLevel0Table(MemTable* mem, VersionEdit* edit);
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
bool HasLargeValues(const WriteBatch& batch) const; bool HasLargeValues(const WriteBatch& batch) const;
// Process data in "*updates" and return a status. "assigned_seq" // Process data in "*updates" and return a status. "assigned_seq"
@ -141,6 +134,7 @@ class DBImpl : public DB {
const InternalKeyComparator internal_comparator_; const InternalKeyComparator internal_comparator_;
const Options options_; // options_.comparator == &internal_comparator_ const Options options_; // options_.comparator == &internal_comparator_
bool owns_info_log_; bool owns_info_log_;
bool owns_cache_;
const std::string dbname_; const std::string dbname_;
// table_cache_ provides its own synchronization // table_cache_ provides its own synchronization
@ -154,11 +148,11 @@ class DBImpl : public DB {
port::AtomicPointer shutting_down_; port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when !bg_compaction_scheduled_ port::CondVar bg_cv_; // Signalled when !bg_compaction_scheduled_
port::CondVar compacting_cv_; // Signalled when !compacting_ port::CondVar compacting_cv_; // Signalled when !compacting_
SequenceNumber last_sequence_;
MemTable* mem_; MemTable* mem_;
MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_; WritableFile* logfile_;
log::Writer* log_; log::Writer* log_;
uint64_t log_number_;
SnapshotList snapshots_; SnapshotList snapshots_;
// Set of table files to protect from deletion because they are // Set of table files to protect from deletion because they are
@ -176,6 +170,23 @@ class DBImpl : public DB {
// Have we encountered a background error in paranoid mode? // Have we encountered a background error in paranoid mode?
Status bg_error_; Status bg_error_;
// Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
int64_t micros;
int64_t bytes_read;
int64_t bytes_written;
CompactionStats() : micros(0), bytes_read(0), bytes_written(0) { }
void Add(const CompactionStats& c) {
this->micros += c.micros;
this->bytes_read += c.bytes_read;
this->bytes_written += c.bytes_written;
}
};
CompactionStats stats_[config::kNumLevels];
// No copying allowed // No copying allowed
DBImpl(const DBImpl&); DBImpl(const DBImpl&);
void operator=(const DBImpl&); void operator=(const DBImpl&);

@ -72,19 +72,11 @@ class DBTest {
} }
Status Put(const std::string& k, const std::string& v) { Status Put(const std::string& k, const std::string& v) {
WriteOptions options; return db_->Put(WriteOptions(), k, v);
options.sync = false;
WriteBatch batch;
batch.Put(k, v);
return db_->Write(options, &batch);
} }
Status Delete(const std::string& k) { Status Delete(const std::string& k) {
WriteOptions options; return db_->Delete(WriteOptions(), k);
options.sync = false;
WriteBatch batch;
batch.Delete(k);
return db_->Write(options, &batch);
} }
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) { std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
@ -147,11 +139,11 @@ class DBTest {
} }
int NumTableFilesAtLevel(int level) { int NumTableFilesAtLevel(int level) {
uint64_t val; std::string property;
ASSERT_TRUE( ASSERT_TRUE(
db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level), db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
&val)); &property));
return val; return atoi(property.c_str());
} }
uint64_t Size(const Slice& start, const Slice& limit) { uint64_t Size(const Slice& start, const Slice& limit) {
@ -185,10 +177,7 @@ class DBTest {
dbfull()->TEST_CompactMemTable(); dbfull()->TEST_CompactMemTable();
int max_level_with_files = 1; int max_level_with_files = 1;
for (int level = 1; level < config::kNumLevels; level++) { for (int level = 1; level < config::kNumLevels; level++) {
uint64_t v; if (NumTableFilesAtLevel(level) > 0) {
char name[100];
snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
if (dbfull()->GetProperty(name, &v) && v > 0) {
max_level_with_files = level; max_level_with_files = level;
} }
} }
@ -459,7 +448,7 @@ TEST(DBTest, MinorCompactionsHappen) {
options.write_buffer_size = 10000; options.write_buffer_size = 10000;
Reopen(&options); Reopen(&options);
const int N = 100; const int N = 500;
int starting_num_tables = NumTableFilesAtLevel(0); int starting_num_tables = NumTableFilesAtLevel(0);
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
@ -1047,7 +1036,7 @@ class ModelDB: public DB {
return Status::OK(); return Status::OK();
} }
virtual bool GetProperty(const Slice& property, uint64_t* value) { virtual bool GetProperty(const Slice& property, std::string* value) {
return false; return false;
} }
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) { virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {

@ -15,6 +15,12 @@
namespace leveldb { namespace leveldb {
// Grouping of constants. We may want to make some of these
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
}
class InternalKey; class InternalKey;
// Value types encoded as the last component of internal keys. // Value types encoded as the last component of internal keys.

@ -20,15 +20,18 @@ enum Tag {
kDeletedFile = 6, kDeletedFile = 6,
kNewFile = 7, kNewFile = 7,
kLargeValueRef = 8, kLargeValueRef = 8,
kPrevLogNumber = 9,
}; };
void VersionEdit::Clear() { void VersionEdit::Clear() {
comparator_.clear(); comparator_.clear();
log_number_ = 0; log_number_ = 0;
prev_log_number_ = 0;
last_sequence_ = 0; last_sequence_ = 0;
next_file_number_ = 0; next_file_number_ = 0;
has_comparator_ = false; has_comparator_ = false;
has_log_number_ = false; has_log_number_ = false;
has_prev_log_number_ = false;
has_next_file_number_ = false; has_next_file_number_ = false;
has_last_sequence_ = false; has_last_sequence_ = false;
deleted_files_.clear(); deleted_files_.clear();
@ -45,6 +48,10 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kLogNumber); PutVarint32(dst, kLogNumber);
PutVarint64(dst, log_number_); PutVarint64(dst, log_number_);
} }
if (has_prev_log_number_) {
PutVarint32(dst, kPrevLogNumber);
PutVarint64(dst, prev_log_number_);
}
if (has_next_file_number_) { if (has_next_file_number_) {
PutVarint32(dst, kNextFileNumber); PutVarint32(dst, kNextFileNumber);
PutVarint64(dst, next_file_number_); PutVarint64(dst, next_file_number_);
@ -142,6 +149,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
} }
break; break;
case kPrevLogNumber:
if (GetVarint64(&input, &prev_log_number_)) {
has_prev_log_number_ = true;
} else {
msg = "previous log number";
}
break;
case kNextFileNumber: case kNextFileNumber:
if (GetVarint64(&input, &next_file_number_)) { if (GetVarint64(&input, &next_file_number_)) {
has_next_file_number_ = true; has_next_file_number_ = true;
@ -228,6 +243,10 @@ std::string VersionEdit::DebugString() const {
r.append("\n LogNumber: "); r.append("\n LogNumber: ");
AppendNumberTo(&r, log_number_); AppendNumberTo(&r, log_number_);
} }
if (has_prev_log_number_) {
r.append("\n PrevLogNumber: ");
AppendNumberTo(&r, prev_log_number_);
}
if (has_next_file_number_) { if (has_next_file_number_) {
r.append("\n NextFile: "); r.append("\n NextFile: ");
AppendNumberTo(&r, next_file_number_); AppendNumberTo(&r, next_file_number_);

@ -39,6 +39,10 @@ class VersionEdit {
has_log_number_ = true; has_log_number_ = true;
log_number_ = num; log_number_ = num;
} }
void SetPrevLogNumber(uint64_t num) {
has_prev_log_number_ = true;
prev_log_number_ = num;
}
void SetNextFile(uint64_t num) { void SetNextFile(uint64_t num) {
has_next_file_number_ = true; has_next_file_number_ = true;
next_file_number_ = num; next_file_number_ = num;
@ -95,10 +99,12 @@ class VersionEdit {
std::string comparator_; std::string comparator_;
uint64_t log_number_; uint64_t log_number_;
uint64_t prev_log_number_;
uint64_t next_file_number_; uint64_t next_file_number_;
SequenceNumber last_sequence_; SequenceNumber last_sequence_;
bool has_comparator_; bool has_comparator_;
bool has_log_number_; bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_; bool has_next_file_number_;
bool has_last_sequence_; bool has_last_sequence_;

@ -27,17 +27,15 @@ static const int kTargetFileSize = 2 * 1048576;
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize; static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
static double MaxBytesForLevel(int level) { static double MaxBytesForLevel(int level) {
if (level == 0) { // Note: the result for level zero is not really used since we set
return 4 * 1048576.0; // the level-0 compaction threshold based on number of files.
} else { double result = 10 * 1048576.0; // Result for both level-0 and level-1
double result = 10 * 1048576.0;
while (level > 1) { while (level > 1) {
result *= 10; result *= 10;
level--; level--;
} }
return result; return result;
} }
}
static uint64_t MaxFileSizeForLevel(int level) { static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files? return kTargetFileSize; // We could vary per level to reduce number of files?
@ -327,6 +325,9 @@ VersionSet::VersionSet(const std::string& dbname,
icmp_(*cmp), icmp_(*cmp),
next_file_number_(2), next_file_number_(2),
manifest_file_number_(0), // Filled by Recover() manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
log_number_(0),
prev_log_number_(0),
descriptor_file_(NULL), descriptor_file_(NULL),
descriptor_log_(NULL), descriptor_log_(NULL),
current_(new Version(this)), current_(new Version(this)),
@ -345,7 +346,19 @@ VersionSet::~VersionSet() {
} }
Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) { Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_); edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
Version* v = new Version(this); Version* v = new Version(this);
{ {
@ -372,7 +385,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
} }
} }
// Write new record to log file // Write new record to MANIFEST log
if (s.ok()) { if (s.ok()) {
std::string record; std::string record;
edit->EncodeTo(&record); edit->EncodeTo(&record);
@ -396,6 +409,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
v->next_ = NULL; v->next_ = NULL;
current_->next_ = v; current_->next_ = v;
current_ = v; current_ = v;
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else { } else {
delete v; delete v;
if (!new_manifest_file.empty()) { if (!new_manifest_file.empty()) {
@ -406,13 +421,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, MemTable* cleanup_mem) {
env_->DeleteFile(new_manifest_file); env_->DeleteFile(new_manifest_file);
} }
} }
//Log(env_, options_->info_log, "State\n%s", current_->DebugString().c_str());
return s; return s;
} }
Status VersionSet::Recover(uint64_t* log_number, Status VersionSet::Recover() {
SequenceNumber* last_sequence) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Status* status; Status* status;
virtual void Corruption(size_t bytes, const Status& s) { virtual void Corruption(size_t bytes, const Status& s) {
@ -439,9 +452,13 @@ Status VersionSet::Recover(uint64_t* log_number,
} }
bool have_log_number = false; bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false; bool have_next_file = false;
bool have_last_sequence = false; bool have_last_sequence = false;
uint64_t next_file = 0; uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
Builder builder(this, current_); Builder builder(this, current_);
{ {
@ -467,17 +484,22 @@ Status VersionSet::Recover(uint64_t* log_number,
} }
if (edit.has_log_number_) { if (edit.has_log_number_) {
*log_number = edit.log_number_; log_number = edit.log_number_;
have_log_number = true; have_log_number = true;
} }
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) { if (edit.has_next_file_number_) {
next_file = edit.next_file_number_; next_file = edit.next_file_number_;
have_next_file = true; have_next_file = true;
} }
if (edit.has_last_sequence_) { if (edit.has_last_sequence_) {
*last_sequence = edit.last_sequence_; last_sequence = edit.last_sequence_;
have_last_sequence = true; have_last_sequence = true;
} }
} }
@ -493,6 +515,10 @@ Status VersionSet::Recover(uint64_t* log_number,
} else if (!have_last_sequence) { } else if (!have_last_sequence) {
s = Status::Corruption("no last-sequence-number entry in descriptor"); s = Status::Corruption("no last-sequence-number entry in descriptor");
} }
if (!have_prev_log_number) {
prev_log_number = 0;
}
} }
if (s.ok()) { if (s.ok()) {
@ -508,12 +534,23 @@ Status VersionSet::Recover(uint64_t* log_number,
current_ = v; current_ = v;
manifest_file_number_ = next_file; manifest_file_number_ = next_file;
next_file_number_ = next_file + 1; next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
} }
} }
return s; return s;
} }
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (int i = 0; i < files.size(); i++) {
sum += files[i]->file_size;
}
return sum;
}
Status VersionSet::Finalize(Version* v) { Status VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction // Precomputed best level for next compaction
int best_level = -1; int best_level = -1;
@ -523,23 +560,24 @@ Status VersionSet::Finalize(Version* v) {
for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) { for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) {
s = SortLevel(v, level); s = SortLevel(v, level);
// Compute the ratio of current size to size limit. double score;
uint64_t level_bytes = 0;
for (int i = 0; i < v->files_[level].size(); i++) {
level_bytes += v->files_[level][i]->file_size;
}
double score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
if (level == 0) { if (level == 0) {
// Level-0 file sizes are going to be often much smaller than // We treat level-0 specially by bounding the number of files
// MaxBytesForLevel(0) since we do not account for compression // instead of number of bytes for two reasons:
// when producing a level-0 file; and too many level-0 files //
// increase merging costs. So use a file-count limit for // (1) With larger write-buffer sizes, it is nice not to do too
// level-0 in addition to the byte-count limit. // many level-0 compactions.
double count_score = v->files_[level].size() / 4.0; //
if (count_score > score) { // (2) The files in level-0 are merged on every read and
score = count_score; // therefore we wish to avoid too many files when the individual
} // file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() / 4.0;
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
} }
if (score > best_score) { if (score > best_score) {
@ -696,8 +734,7 @@ bool VersionSet::RegisterLargeValueRef(const LargeValueRef& large_ref,
return is_first; return is_first;
} }
void VersionSet::CleanupLargeValueRefs(const std::set<uint64_t>& live_tables, void VersionSet::CleanupLargeValueRefs(const std::set<uint64_t>& live_tables) {
uint64_t log_file_num) {
for (LargeValueMap::iterator it = large_value_refs_.begin(); for (LargeValueMap::iterator it = large_value_refs_.begin();
it != large_value_refs_.end(); it != large_value_refs_.end();
) { ) {
@ -705,7 +742,8 @@ void VersionSet::CleanupLargeValueRefs(const std::set<uint64_t>& live_tables,
for (LargeReferencesSet::iterator ref_it = refs->begin(); for (LargeReferencesSet::iterator ref_it = refs->begin();
ref_it != refs->end(); ref_it != refs->end();
) { ) {
if (ref_it->first != log_file_num && // Not in log file if (ref_it->first != log_number_ && // Not in log file
ref_it->first != prev_log_number_ && // Not in prev log
live_tables.count(ref_it->first) == 0) { // Not in a live table live_tables.count(ref_it->first) == 0) { // Not in a live table
// No longer live: erase // No longer live: erase
LargeReferencesSet::iterator to_erase = ref_it; LargeReferencesSet::iterator to_erase = ref_it;
@ -762,12 +800,10 @@ void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
} }
} }
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) { int64_t VersionSet::NumLevelBytes(int level) const {
int64_t sum = 0; assert(level >= 0);
for (int i = 0; i < files.size(); i++) { assert(level < config::kNumLevels);
sum += files[i]->file_size; return TotalFileSize(current_->files_[level]);
}
return sum;
} }
int64_t VersionSet::MaxNextLevelOverlappingBytes() { int64_t VersionSet::MaxNextLevelOverlappingBytes() {

@ -24,12 +24,6 @@
namespace leveldb { namespace leveldb {
// Grouping of constants. We may want to make some of these
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
}
namespace log { class Writer; } namespace log { class Writer; }
class Compaction; class Compaction;
@ -107,7 +101,7 @@ class VersionSet {
Status LogAndApply(VersionEdit* edit, MemTable* cleanup_mem); Status LogAndApply(VersionEdit* edit, MemTable* cleanup_mem);
// Recover the last saved descriptor from persistent storage. // Recover the last saved descriptor from persistent storage.
Status Recover(uint64_t* log_number, SequenceNumber* last_sequence); Status Recover();
// Save current contents to *log // Save current contents to *log
Status WriteSnapshot(log::Writer* log); Status WriteSnapshot(log::Writer* log);
@ -124,6 +118,25 @@ class VersionSet {
// Return the number of Table files at the specified level. // Return the number of Table files at the specified level.
int NumLevelFiles(int level) const; int NumLevelFiles(int level) const;
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
// Return the last sequence number.
uint64_t LastSequence() const { return last_sequence_; }
// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
}
// Return the current log file number.
uint64_t LogNumber() const { return log_number_; }
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }
// Pick level and inputs for a new compaction. // Pick level and inputs for a new compaction.
// Returns NULL if there is no compaction to be done. // Returns NULL if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that // Otherwise returns a pointer to a heap-allocated object that
@ -168,9 +181,8 @@ class VersionSet {
// Cleanup the large value reference state by eliminating any // Cleanup the large value reference state by eliminating any
// references from files that are not includes in either "live_tables" // references from files that are not includes in either "live_tables"
// or "log_file". // or the current log.
void CleanupLargeValueRefs(const std::set<uint64_t>& live_tables, void CleanupLargeValueRefs(const std::set<uint64_t>& live_tables);
uint64_t log_file_num);
// Returns true if a large value with the given reference is live. // Returns true if a large value with the given reference is live.
bool LargeValueIsLive(const LargeValueRef& large_ref); bool LargeValueIsLive(const LargeValueRef& large_ref);
@ -213,6 +225,9 @@ class VersionSet {
const InternalKeyComparator icmp_; const InternalKeyComparator icmp_;
uint64_t next_file_number_; uint64_t next_file_number_;
uint64_t manifest_file_number_; uint64_t manifest_file_number_;
uint64_t last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily // Opened lazily
WritableFile* descriptor_file_; WritableFile* descriptor_file_;

@ -63,15 +63,12 @@ Example:
The database provides <code>Put</code>, <code>Delete</code>, and <code>Get</code> methods to The database provides <code>Put</code>, <code>Delete</code>, and <code>Get</code> methods to
modify/query the database. For example, the following code modify/query the database. For example, the following code
moves the value stored under key1 to key2. moves the value stored under key1 to key2.
<p>
<pre> <pre>
std::string value; std::string value;
leveldb::Status s = db-&gt;Get(leveldb::ReadOptions(), key1, &amp;value); leveldb::Status s = db-&gt;Get(leveldb::ReadOptions(), key1, &amp;value);
if (s.ok()) s = db-&gt;Put(leveldb::WriteOptions(), key2, value); if (s.ok()) s = db-&gt;Put(leveldb::WriteOptions(), key2, value);
if (s.ok()) s = db-&gt;Delete(leveldb::WriteOptions(), key1); if (s.ok()) s = db-&gt;Delete(leveldb::WriteOptions(), key1);
</pre> </pre>
See <a href="#async">important performance note</a> below for how to
speed up writes significantly.
<h1>Atomic Updates</h1> <h1>Atomic Updates</h1>
<p> <p>
@ -100,6 +97,47 @@ we do not end up erroneously dropping the value entirely.
Apart from its atomicity benefits, <code>WriteBatch</code> may also be used to Apart from its atomicity benefits, <code>WriteBatch</code> may also be used to
speed up bulk updates by placing lots of individual mutations into the speed up bulk updates by placing lots of individual mutations into the
same batch. same batch.
<h1>Synchronous Writes</h1>
By default, each write to <code>leveldb</code> is asynchronous: it
returns after pushing the write from the process into the operating
system. The transfer from operating system memory to the underlying
persistent storage happens asynchronously. The <code>sync</code> flag
can be turned on for a particular write to make the write operation
not return until the data being written has been pushed all the way to
persistent storage. (On Posix systems, this is implemented by calling
either <code>fsync(...)</code> or <code>fdatasync(...)</code> or
<code>msync(..., MS_SYNC)</code> before the write operation returns.)
<pre>
leveldb::WriteOptions write_options;
write_options.sync = true;
db-&gt;Put(write_options, ...);
</pre>
Asynchronous writes are often more than a thousand times as fast as
synchronous writes. The downside of asynchronous writes is that a
crash of the machine may cause the last few updates to be lost. Note
that a crash of just the writing process (i.e., not a reboot) will not
cause any loss since even when <code>sync</code> is false, an update
is pushed from the process memory into the operating system before it
is considered done.
<p>
Asynchronous writes can often be used safely. For example, when
loading a large amount of data into the database you can handle lost
updates by restarting the bulk load after a crash. A hybrid scheme is
also possible where every Nth write is synchronous, and in the event
of a crash, the bulk load is restarted just after the last synchronous
write finished by the previous run. (The synchronous write can update
a marker that describes where to restart on a crash.)
<p>
<code>WriteBatch</code> provides an alternative to asynchronous writes.
Multiple updates may be placed in the same <code>WriteBatch</code> and
applied together using a synchronous write (i.e.,
<code>write_options.sync</code> is set to true). The extra cost of
the synchronous write will be amortized across all of the writes in
the batch.
<p> <p>
<h1>Concurrency</h1> <h1>Concurrency</h1>
<p> <p>
@ -289,48 +327,12 @@ version numbers found in the keys to decide how to interpret them.
Performance can be tuned by changing the default values of the Performance can be tuned by changing the default values of the
types defined in <code>leveldb/include/options.h</code>. types defined in <code>leveldb/include/options.h</code>.
<p>
<h2><a name="async">Asynchronous Writes</a></h2>
By default, each write to <code>leveldb</code> is synchronous: it does
not return until the write has been pushed from memory to persistent
storage. (On Posix systems, this is implemented by calling either
<code>fdatasync(...)</code> or <code>msync(..., MS_SYNC)</code>.)
<strong>Synchronous writes may be very slow and the synchrony can be
optionally disabled</strong>:
<pre>
leveldb::WriteOptions write_options;
write_options.sync = false;
db-&gt;Put(write_options, ...);
</pre>
Asynchronous writes are often more than a hundred times as fast as
synchronous writes. The downside of asynchronous writes is that a
crash of the machine may cause the last few updates to be lost. Note
that a crash of just the writing process (i.e., not a reboot) will not
cause any loss since even when <code>sync</code> is false, an update
is pushed from the process memory into the operating system before it
is considered done.
<p>
Asynchronous writes can be particularly beneficial when loading a
large amount of data into the database since you can mitigate the
problem of lost updates by restarting the bulk load. A hybrid scheme
is also possible where every Nth write is synchronous, and in the
event of a crash, the bulk load is restarted just after the last
synchronous write finished by the previous run.
<p>
<code>WriteBatch</code> provides an alternative to asynchronous writes.
Multiple updates may be placed in the same <code>WriteBatch</code> and
applied together using a synchronous write. The extra cost of the
synchronous write will be amortized across all of the writes in the batch.
<p> <p>
<h2>Block size</h2> <h2>Block size</h2>
<p> <p>
<code>leveldb</code> groups adjacent keys together into the same block and such a <code>leveldb</code> groups adjacent keys together into the same block and such a
block is the unit of transfer to and from persistent storage. The block is the unit of transfer to and from persistent storage. The
default block size is approximately 8192 uncompressed bytes. default block size is approximately 4096 uncompressed bytes.
Applications that mostly do bulk scans over the contents of the Applications that mostly do bulk scans over the contents of the
database may wish to increase this size. Applications that do a lot database may wish to increase this size. Applications that do a lot
of point reads of small values may wish to switch to a smaller block of point reads of small values may wish to switch to a smaller block

@ -13,7 +13,7 @@
namespace leveldb { namespace leveldb {
static const int kMajorVersion = 1; static const int kMajorVersion = 1;
static const int kMinorVersion = 0; static const int kMinorVersion = 1;
struct Options; struct Options;
struct ReadOptions; struct ReadOptions;
@ -49,7 +49,7 @@ class DB {
// Set the database entry for "key" to "value". Returns OK on success, // Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error. // and a non-OK status on error.
// Note: consider setting options.sync = false. // Note: consider setting options.sync = true.
virtual Status Put(const WriteOptions& options, virtual Status Put(const WriteOptions& options,
const Slice& key, const Slice& key,
const Slice& value) = 0; const Slice& value) = 0;
@ -57,12 +57,12 @@ class DB {
// Remove the database entry (if any) for "key". Returns OK on // Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key" // success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database. // did not exist in the database.
// Note: consider setting options.sync = false. // Note: consider setting options.sync = true.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Apply the specified updates to the database. // Apply the specified updates to the database.
// Returns OK on success, non-OK on failure. // Returns OK on success, non-OK on failure.
// Note: consider setting options.sync = false. // Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
// If the database contains an entry for "key" store the // If the database contains an entry for "key" store the
@ -103,7 +103,9 @@ class DB {
// //
// "leveldb.num-files-at-level<N>" - return the number of files at level <N>, // "leveldb.num-files-at-level<N>" - return the number of files at level <N>,
// where <N> is an ASCII representation of a level number (e.g. "0"). // where <N> is an ASCII representation of a level number (e.g. "0").
virtual bool GetProperty(const Slice& property, uint64_t* value) = 0; // "leveldb.stats" - returns a multi-line string that describes statistics
// about the internal operation of the DB.
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
// For each i in [0,n-1], store in "sizes[i]", the approximate // For each i in [0,n-1], store in "sizes[i]", the approximate
// file system space used by keys in "[range[i].start .. range[i].limit)". // file system space used by keys in "[range[i].start .. range[i].limit)".

@ -69,15 +69,14 @@ struct Options {
// ------------------- // -------------------
// Parameters that affect performance // Parameters that affect performance
// Amount of data to build up in memory before converting to an // Amount of data to build up in memory (backed by an unsorted log
// on-disk file. // on disk) before converting to a sorted on-disk file.
// //
// Some DB operations may encounter a delay proportional to the size // Larger values increase performance, especially during bulk loads.
// of this parameter. Therefore we recommend against increasing // Up to two write buffers may be held in memory at the same time,
// this parameter unless you are willing to live with an occasional // so you may wish to adjust this parameter to control memory usage.
// slow operation in exchange for faster bulk loading throughput.
// //
// Default: 1MB // Default: 4MB
size_t write_buffer_size; size_t write_buffer_size;
// Number of open files that can be used by the DB. You may need to // Number of open files that can be used by the DB. You may need to
@ -100,7 +99,8 @@ struct Options {
// Control over blocks (user data is stored in a set of blocks, and // Control over blocks (user data is stored in a set of blocks, and
// a block is the unit of reading from disk). // a block is the unit of reading from disk).
// Use the specified cache for blocks (if non-NULL). // If non-NULL, use the specified cache for blocks.
// If NULL, leveldb will automatically create and use an 8MB internal cache.
// Default: NULL // Default: NULL
Cache* block_cache; Cache* block_cache;
@ -109,7 +109,7 @@ struct Options {
// actual size of the unit read from disk may be smaller if // actual size of the unit read from disk may be smaller if
// compression is enabled. This parameter can be changed dynamically. // compression is enabled. This parameter can be changed dynamically.
// //
// Default: 8K // Default: 4K
int block_size; int block_size;
// Number of keys between restart points for delta encoding of keys. // Number of keys between restart points for delta encoding of keys.
@ -177,7 +177,12 @@ struct WriteOptions {
// crashes (i.e., the machine does not reboot), no writes will be // crashes (i.e., the machine does not reboot), no writes will be
// lost even if sync==false. // lost even if sync==false.
// //
// Default: true // In other words, a DB write with sync==false has similar
// crash semantics as the "write()" system call. A DB write
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
//
// Default: false
bool sync; bool sync;
// If "post_write_snapshot" is non-NULL, and the write succeeds, // If "post_write_snapshot" is non-NULL, and the write succeeds,
@ -193,7 +198,7 @@ struct WriteOptions {
const Snapshot** post_write_snapshot; const Snapshot** post_write_snapshot;
WriteOptions() WriteOptions()
: sync(true), : sync(false),
post_write_snapshot(NULL) { post_write_snapshot(NULL) {
} }
}; };

@ -24,7 +24,6 @@ int fdatasync(int fd) {
} }
} }
// TODO(gabor): This is copied from port_posix.cc - not sure if I should do this?
namespace leveldb { namespace leveldb {
namespace port { namespace port {

@ -15,6 +15,20 @@
#include <string> #include <string>
#include <cctype> #include <cctype>
// Collapse the plethora of ARM flavors available to an easier to manage set
// Defs reference is at https://wiki.edubuntu.org/ARM/Thumb2PortingHowto
#if defined(__ARM_ARCH_6__) || \
defined(__ARM_ARCH_6J__) || \
defined(__ARM_ARCH_6K__) || \
defined(__ARM_ARCH_6Z__) || \
defined(__ARM_ARCH_6T2__) || \
defined(__ARM_ARCH_6ZK__) || \
defined(__ARM_ARCH_7__) || \
defined(__ARM_ARCH_7R__) || \
defined(__ARM_ARCH_7A__)
#define ARMV6_OR_7 1
#endif
extern "C" { extern "C" {
size_t fread_unlocked(void *a, size_t b, size_t c, FILE *d); size_t fread_unlocked(void *a, size_t b, size_t c, FILE *d);
size_t fwrite_unlocked(const void *a, size_t b, size_t c, FILE *d); size_t fwrite_unlocked(const void *a, size_t b, size_t c, FILE *d);
@ -61,28 +75,50 @@ class CondVar {
pthread_cond_t cv_; pthread_cond_t cv_;
}; };
#ifndef ARMV6_OR_7
// On ARM chipsets <V6, 0xffff0fa0 is the hard coded address of a
// memory barrier function provided by the kernel.
typedef void (*LinuxKernelMemoryBarrierFunc)(void);
LinuxKernelMemoryBarrierFunc pLinuxKernelMemoryBarrier ATTRIBUTE_WEAK =
(LinuxKernelMemoryBarrierFunc) 0xffff0fa0;
#endif
// Storage for a lock-free pointer // Storage for a lock-free pointer
class AtomicPointer { class AtomicPointer {
private: private:
std::atomic<void*> rep_; void* rep_;
inline void MemoryBarrier() const {
// TODO(gabor): This only works on Android instruction sets >= V6
#ifdef ARMV6_OR_7
__asm__ __volatile__("dmb" : : : "memory");
#else
pLinuxKernelMemoryBarrier();
#endif
}
public: public:
AtomicPointer() { } AtomicPointer() { }
explicit AtomicPointer(void* v) : rep_(v) { } explicit AtomicPointer(void* v) : rep_(v) { }
inline void* Acquire_Load() const { inline void* Acquire_Load() const {
return rep_.load(std::memory_order_acquire); void* r = rep_;
MemoryBarrier();
return r;
} }
inline void Release_Store(void* v) { inline void Release_Store(void* v) {
rep_.store(v, std::memory_order_release); MemoryBarrier();
rep_ = v;
} }
inline void* NoBarrier_Load() const { inline void* NoBarrier_Load() const {
return rep_.load(std::memory_order_relaxed); void* r = rep_;
return r;
} }
inline void NoBarrier_Store(void* v) { inline void NoBarrier_Store(void* v) {
rep_.store(v, std::memory_order_relaxed); rep_ = v;
} }
}; };
// TODO(gabor): Implement actual compress // TODO(gabor): Implement compress
inline bool Snappy_Compress( inline bool Snappy_Compress(
const char* input, const char* input,
size_t input_length, size_t input_length,
@ -90,7 +126,7 @@ inline bool Snappy_Compress(
return false; return false;
} }
// TODO(gabor): Implement actual uncompress // TODO(gabor): Implement uncompress
inline bool Snappy_Uncompress( inline bool Snappy_Uncompress(
const char* input_data, const char* input_data,
size_t input_length, size_t input_length,

@ -725,10 +725,10 @@ TEST(Harness, RandomizedLongDB) {
Test(&rnd); Test(&rnd);
// We must have created enough data to force merging // We must have created enough data to force merging
uint64_t l0_files, l1_files; std::string l0_files, l1_files;
ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files)); ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files));
ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files)); ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files));
ASSERT_GT(l0_files + l1_files, 0); ASSERT_GT(atoi(l0_files.c_str()) + atoi(l1_files.c_str()), 0);
} }

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save