Merge branch 'master' into performance

Conflicts:
	include/leveldb/options.h
	include/leveldb/statistics.h
	util/options.cc
main
Dhruba Borthakur 12 years ago
commit 711a30cb30
  1. 4
      Makefile
  2. 25
      README
  3. 1
      db/corruption_test.cc
  4. 8
      db/db_bench.cc
  5. 241
      db/db_impl.cc
  6. 41
      db/db_impl.h
  7. 3
      db/db_impl_readonly.h
  8. 83
      db/db_test.cc
  9. 64
      db/memtable.cc
  10. 35
      db/memtable.h
  11. 4
      db/memtablelist.cc
  12. 3
      db/memtablelist.h
  13. 30
      db/merge_test.cc
  14. 3
      db/repair.cc
  15. 3
      db/skiplist.h
  16. 14
      db/skiplist_test.cc
  17. 102
      db/skiplistrep.h
  18. 22
      db/table_cache.cc
  19. 10
      db/table_cache.h
  20. 21
      db/version_set.cc
  21. 3
      db/version_set.h
  22. 43
      db/write_batch.cc
  23. 9
      db/write_batch_internal.h
  24. 5
      db/write_batch_test.cc
  25. 38
      include/leveldb/arena.h
  26. 30
      include/leveldb/db.h
  27. 88
      include/leveldb/memtablerep.h
  28. 31
      include/leveldb/options.h
  29. 15
      include/leveldb/statistics.h
  30. 24
      table/table.cc
  31. 5
      table/table.h
  32. 13
      table/table_test.cc
  33. 3
      tools/db_crashtest.py
  34. 3
      tools/db_crashtest2.py
  35. 12
      tools/db_stress.cc
  36. 27
      util/arena_impl.cc
  37. 43
      util/arena_impl.h
  38. 59
      util/arena_test.cc
  39. 12
      util/options.cc
  40. 42
      utilities/ttl/db_ttl.cc
  41. 66
      utilities/ttl/db_ttl.h

@ -240,8 +240,8 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
merge_test: db/merge_test.o $(LIBOBJECTS)
$(CXX) db/merge_test.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
$(MEMENVLIBRARY) : $(MEMENVOBJECTS)
rm -f $@

@ -1,5 +1,7 @@
rocksdb: A persistent key-value store for flash storage
Authors: The Facebook Database Engineering Team
Authors: * The Facebook Database Engineering Team
* Build on earlier work on leveldb by Sanjay Ghemawat
(sanjay@google.com) and Jeff Dean (jeff@google.com)
This code is a library that forms the core building block for a fast
key value server, especially suited for storing data on flash drives.
@ -56,6 +58,25 @@ include/env.h
Abstraction of the OS environment. A posix implementation of
this interface is in util/env_posix.cc
include/table.h
include/table_builder.h
Lower-level modules that most clients probably won't use directly
include/cache.h
An API for the block cache.
include/compaction_filter.h
An API for a application filter invoked on every compaction.
include/filter_policy.h
An API for configuring a bloom filter.
include/memtablerep.h
An API for implementing a memtable.
include/statistics.h
An API to retrieve various database statistics.
include/transaction_log_iterator.h
An API to retrieve transaction logs from a database.

@ -57,6 +57,7 @@ class CorruptionTest {
opt.env = &env_;
opt.block_cache = tiny_cache_;
opt.block_size_deviation = 0;
opt.arena_block_size = 4096;
return DB::Open(opt, dbname_, &db_);
}

@ -338,7 +338,7 @@ static auto FLAGS_bytes_per_sync =
leveldb::Options().bytes_per_sync;
// On true, deletes use bloom-filter and drop the delete if key not present
static bool FLAGS_deletes_check_filter_first = false;
static bool FLAGS_filter_deletes = false;
namespace leveldb {
@ -1128,7 +1128,7 @@ unique_ptr<char []> GenerateKeyFromInt(int v, const char* suffix = "")
options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier;
options.deletes_check_filter_first = FLAGS_deletes_check_filter_first;
options.filter_deletes = FLAGS_filter_deletes;
if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) {
if (FLAGS_max_bytes_for_level_multiplier_additional.size() !=
(unsigned int)FLAGS_num_levels) {
@ -2246,9 +2246,9 @@ int main(int argc, char** argv) {
FLAGS_keys_per_multiget = n;
} else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) {
FLAGS_bytes_per_sync = l;
} else if (sscanf(argv[i], "--deletes_check_filter_first=%d%c", &n, &junk)
} else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk)
== 1 && (n == 0 || n ==1 )) {
FLAGS_deletes_check_filter_first = n;
FLAGS_filter_deletes = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

@ -129,6 +129,12 @@ Options SanitizeOptions(const std::string& dbname,
((size_t)64)<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20);
// if user sets arena_block_size, we trust user to use this value. Otherwise,
// calculate a proper value from writer_buffer_size;
if (result.arena_block_size <= 0) {
result.arena_block_size = result.write_buffer_size / 10;
}
result.min_write_buffer_number_to_merge = std::min(
result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1);
if (result.info_log == nullptr) {
@ -164,7 +170,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr),
bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_, NumberLevels())),
mem_rep_factory_(options_.memtable_factory),
mem_(new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_)),
logfile_number_(0),
tmp_batch_(),
bg_compaction_scheduled_(0),
@ -178,20 +186,28 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
stall_level0_slowdown_(0),
stall_memtable_compaction_(0),
stall_level0_num_files_(0),
stall_level0_slowdown_count_(0),
stall_memtable_compaction_count_(0),
stall_level0_num_files_count_(0),
started_at_(options.env->NowMicros()),
flush_on_destroy_(false),
stats_(options.num_levels),
delayed_writes_(0),
last_flushed_sequence_(0),
storage_options_(options) {
storage_options_(options),
bg_work_gate_closed_(false),
refitting_level_(false) {
mem_->Ref();
env_->GetAbsolutePath(dbname, &db_absolute_path_);
stall_leveln_slowdown_.resize(options.num_levels);
for (int i = 0; i < options.num_levels; ++i)
stall_leveln_slowdown_count_.resize(options.num_levels);
for (int i = 0; i < options.num_levels; ++i) {
stall_leveln_slowdown_[i] = 0;
stall_leveln_slowdown_count_[i] = 0;
}
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options_.max_open_files - 10;
@ -687,10 +703,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
mem = new MemTable(internal_comparator_, NumberLevels());
mem = new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem);
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
@ -904,7 +921,8 @@ Status DBImpl::CompactMemTable(bool* madeProgress) {
return s;
}
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
void DBImpl::CompactRange(const Slice* begin, const Slice* end,
bool reduce_level) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
@ -919,6 +937,79 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
if (reduce_level) {
ReFitLevel(max_level_with_files);
}
}
// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(int level) {
mutex_.AssertHeld();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty
if (versions_->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break;
minimum_level = i;
}
return minimum_level;
}
void DBImpl::ReFitLevel(int level) {
assert(level < NumberLevels());
MutexLock l(&mutex_);
// only allow one thread refitting
if (refitting_level_) {
Log(options_.info_log, "ReFitLevel: another thread is refitting");
return;
}
refitting_level_ = true;
// wait for all background threads to stop
bg_work_gate_closed_ = true;
while (bg_compaction_scheduled_ > 0) {
Log(options_.info_log,
"RefitLevel: waiting for background threads to stop: %d",
bg_compaction_scheduled_);
bg_cv_.Wait();
}
// move to a smaller level
int to_level = FindMinimumEmptyLevelFitting(level);
assert(to_level <= level);
if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s",
versions_->current()->DebugString().data());
VersionEdit edit(NumberLevels());
for (const auto& f : versions_->current()->files_[level]) {
edit.DeleteFile(level, f->number);
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
}
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
auto status = versions_->LogAndApply(&edit, &mutex_);
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());
if (status.ok()) {
Log(options_.info_log, "After refitting:\n%s",
versions_->current()->DebugString().data());
}
}
refitting_level_ = false;
bg_work_gate_closed_ = false;
}
int DBImpl::NumberLevels() {
@ -1242,7 +1333,9 @@ Status DBImpl::TEST_WaitForCompact() {
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_ >= options_.max_background_compactions) {
if (bg_work_gate_closed_) {
// gate closed for backgrond work
} else if (bg_compaction_scheduled_ >= options_.max_background_compactions) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
@ -2019,13 +2112,11 @@ Status DBImpl::Get(const ReadOptions& options,
return GetImpl(options, key, value);
}
// If no_IO is true, then returns Status::NotFound if key is not in memtable,
// immutable-memtable and bloom-filters can guarantee that key is not in db,
// "value" is garbage string if no_IO is true
Status DBImpl::GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
const bool no_IO) {
const bool no_io,
bool* value_found) {
Status s;
StopWatch sw(env_, options_.statistics, DB_GET);
@ -2054,12 +2145,12 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// value will contain the current merge operand in the latter case.
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, options_, no_IO)) {
if (mem->Get(lkey, value, &s, options_)) {
// Done
} else if (imm.Get(lkey, value, &s, options_, no_IO)) {
} else if (imm.Get(lkey, value, &s, options_)) {
// Done
} else {
current->Get(options, lkey, value, &s, &stats, options_, no_IO);
current->Get(options, lkey, value, &s, &stats, options_, no_io,value_found);
have_stat_update = true;
}
mutex_.Lock();
@ -2149,10 +2240,14 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
return statList;
}
bool DBImpl::KeyMayExist(const Slice& key) {
std::string value;
const Status s = GetImpl(ReadOptions(), key, &value, true);
return !s.IsNotFound();
bool DBImpl::KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found) {
if (value_found != nullptr) {
*value_found = true; // falsify later if key-may-exist but can't fetch value
}
return GetImpl(options, key, value, true, value_found).ok();
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -2190,10 +2285,6 @@ Status DBImpl::Merge(const WriteOptions& o, const Slice& key,
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
if (options_.deletes_check_filter_first && !KeyMayExist(key)) {
RecordTick(options_.statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
return DB::Delete(options, key);
}
@ -2252,7 +2343,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this,
options_.filter_deletes);
if (!status.ok()) {
// Panic for in-memory corruptions
// Note that existing logic was not sound. Any partial failure writing
@ -2341,6 +2433,40 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
return result;
}
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if num_level_files < level0_slowdown_writes_trigger, return 0;
// if num_level_files >= level0_stop_writes_trigger, return 1000;
// otherwise, let r = (num_level_files - level0_slowdown) /
// (level0_stop - level0_slowdown)
// and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t DBImpl::SlowdownAmount(int num_level0_files) {
uint64_t delay;
int stop_trigger = options_.level0_stop_writes_trigger;
int slowdown_trigger = options_.level0_slowdown_writes_trigger;
if (num_level0_files >= stop_trigger) {
delay = 1000;
}
else if (num_level0_files < slowdown_trigger) {
delay = 0;
}
else {
// If we are here, we know that:
// slowdown_trigger <= num_level0_files < stop_trigger
// since the previous two conditions are false.
float how_much =
(float) (num_level0_files - slowdown_trigger) /
(stop_trigger - slowdown_trigger);
delay = how_much * how_much * 1000;
}
assert(delay <= 1000);
return delay;
}
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
@ -2364,15 +2490,19 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// individual write by 0-1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
mutex_.Unlock();
uint64_t t1 = env_->NowMicros();
env_->SleepForMicroseconds(1000);
uint64_t delayed = env_->NowMicros() - t1;
uint64_t delayed;
{
StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT);
env_->SleepForMicroseconds(SlowdownAmount(versions_->NumLevelFiles(0)));
delayed = sw.ElapsedMicros();
}
RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed);
stall_level0_slowdown_ += delayed;
stall_level0_slowdown_count_++;
allow_delay = false; // Do not delay a single write more than once
//Log(options_.info_log,
// "delaying write %llu usecs for level0_slowdown_writes_trigger\n",
@ -2391,21 +2521,30 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// ones are still being compacted, so we wait.
DelayLoggingAndReset();
Log(options_.info_log, "wait for memtable compaction...\n");
uint64_t t1 = env_->NowMicros();
bg_cv_.Wait();
const uint64_t stall = env_->NowMicros() -t1;
uint64_t stall;
{
StopWatch sw(env_, options_.statistics,
STALL_MEMTABLE_COMPACTION_COUNT);
bg_cv_.Wait();
stall = sw.ElapsedMicros();
}
RecordTick(options_.statistics, STALL_MEMTABLE_COMPACTION_MICROS, stall);
stall_memtable_compaction_ += stall;
stall_memtable_compaction_count_++;
} else if (versions_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
// There are too many level-0 files.
DelayLoggingAndReset();
uint64_t t1 = env_->NowMicros();
Log(options_.info_log, "wait for fewer level0 files...\n");
bg_cv_.Wait();
const uint64_t stall = env_->NowMicros() - t1;
uint64_t stall;
{
StopWatch sw(env_, options_.statistics, STALL_L0_NUM_FILES_COUNT);
bg_cv_.Wait();
stall = sw.ElapsedMicros();
}
RecordTick(options_.statistics, STALL_L0_NUM_FILES_MICROS, stall);
stall_level0_num_files_ += stall;
stall_level0_num_files_count_++;
} else if (
allow_rate_limit_delay &&
options_.rate_limit > 1.0 &&
@ -2413,10 +2552,14 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// Delay a write when the compaction score for any level is too large.
int max_level = versions_->MaxCompactionScoreLevel();
mutex_.Unlock();
uint64_t t1 = env_->NowMicros();
env_->SleepForMicroseconds(1000);
uint64_t delayed = env_->NowMicros() - t1;
uint64_t delayed;
{
StopWatch sw(env_, options_.statistics, RATE_LIMIT_DELAY_COUNT);
env_->SleepForMicroseconds(1000);
delayed = sw.ElapsedMicros();
}
stall_leveln_slowdown_[max_level] += delayed;
stall_leveln_slowdown_count_[max_level]++;
// Make sure the following value doesn't round to zero.
uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
rate_limit_delay_millis += rate_limit;
@ -2454,7 +2597,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
log_.reset(new log::Writer(std::move(lfile)));
mem_->SetLogNumber(logfile_number_);
imm_.Add(mem_);
mem_ = new MemTable(internal_comparator_, NumberLevels());
mem_ = new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
@ -2510,6 +2654,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
// Add "+1" to make sure seconds_up is > 0 and avoid NaN later
double seconds_up = (micros_up + 1) / 1000000.0;
uint64_t total_slowdown = 0;
uint64_t total_slowdown_count = 0;
uint64_t interval_bytes_written = 0;
uint64_t interval_bytes_read = 0;
uint64_t interval_bytes_new = 0;
@ -2518,8 +2663,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
// Pardon the long line but I think it is easier to read this way.
snprintf(buf, sizeof(buf),
" Compactions\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall\n"
"----------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
"Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n"
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < NumberLevels(); level++) {
@ -2539,7 +2684,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
snprintf(
buf, sizeof(buf),
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f\n",
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n",
level,
files,
versions_->NumLevelBytes(level) / 1048576.0,
@ -2561,8 +2706,10 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
stats_[level].files_out_levelnp1,
stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1,
stats_[level].count,
stall_leveln_slowdown_[level] / 1000000.0);
stall_leveln_slowdown_[level] / 1000000.0,
(unsigned long) stall_leveln_slowdown_count_[level]);
total_slowdown += stall_leveln_slowdown_[level];
total_slowdown_count += stall_leveln_slowdown_count_[level];
value->append(buf);
}
}
@ -2638,6 +2785,15 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
total_slowdown / 1000000.0);
value->append(buf);
snprintf(buf, sizeof(buf),
"Stalls(count): %lu level0_slowdown, %lu level0_numfiles, "
"%lu memtable_compaction, %lu leveln_slowdown\n",
(unsigned long) stall_level0_slowdown_count_,
(unsigned long) stall_level0_num_files_count_,
(unsigned long) stall_memtable_compaction_count_,
(unsigned long) total_slowdown_count);
value->append(buf);
last_stats_.bytes_read_ = total_bytes_read;
last_stats_.bytes_written_ = total_bytes_written;
last_stats_.bytes_new_ = stats_[0].bytes_written;
@ -2708,8 +2864,7 @@ Status DB::Merge(const WriteOptions& opt, const Slice& key,
DB::~DB() { }
Status DB::Open(const Options& options, const std::string& dbname,
DB** dbptr) {
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
EnvOptions soptions;

@ -18,6 +18,7 @@
#include "port/port.h"
#include "util/stats_logger.h"
#include "memtablelist.h"
#include "leveldb/memtablerep.h"
#ifdef USE_SCRIBE
#include "scribe/scribe_logger.h"
@ -49,15 +50,21 @@ class DBImpl : public DB {
const std::vector<Slice>& keys,
std::vector<std::string>* values);
// Returns false if key can't exist- based on memtable, immutable-memtable and
// bloom-filters; true otherwise. No IO is performed
virtual bool KeyMayExist(const Slice& key);
// Returns false if key doesn't exist in the database and true if it may.
// If value_found is not passed in as null, then return the value if found in
// memory. On return, if value was found, then value_found will be set to true
// , otherwise false.
virtual bool KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found = nullptr);
virtual Iterator* NewIterator(const ReadOptions&);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end);
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false);
virtual int NumberLevels();
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
@ -159,6 +166,7 @@ class DBImpl : public DB {
Status WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
uint64_t* filenumber);
uint64_t SlowdownAmount(int num_level0_files);
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
WriteBatch* BuildBatchGroup(Writer** last_writer);
@ -221,6 +229,14 @@ class DBImpl : public DB {
// dump leveldb.stats to LOG
void MaybeDumpStats();
// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(int level);
// Move the files in the input level to the minimum level that could hold
// the data set.
void ReFitLevel(int level);
// Constant after construction
const InternalFilterPolicy internal_filter_policy_;
bool owns_info_log_;
@ -235,6 +251,7 @@ class DBImpl : public DB {
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes
std::shared_ptr<MemTableRepFactory> mem_rep_factory_;
MemTable* mem_;
MemTableList imm_; // Memtable that are not changing
uint64_t logfile_number_;
@ -293,6 +310,10 @@ class DBImpl : public DB {
uint64_t stall_memtable_compaction_;
uint64_t stall_level0_num_files_;
std::vector<uint64_t> stall_leveln_slowdown_;
uint64_t stall_level0_slowdown_count_;
uint64_t stall_memtable_compaction_count_;
uint64_t stall_level0_num_files_count_;
std::vector<uint64_t> stall_leveln_slowdown_count_;
// Time at which this instance was started.
const uint64_t started_at_;
@ -370,6 +391,12 @@ class DBImpl : public DB {
// The options to access storage files
const EnvOptions storage_options_;
// A value of true temporarily disables scheduling of background work
bool bg_work_gate_closed_;
// Guard against multiple concurrent refitting
bool refitting_level_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
@ -384,11 +411,13 @@ class DBImpl : public DB {
std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
// Function that Get and KeyMayExist call with no_IO true or false
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
const bool no_IO = false);
const bool no_io = false,
bool* value_found = nullptr);
};
// Sanitize db options. The caller should delete result.info_log if

@ -47,7 +47,8 @@ public:
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual void CompactRange(const Slice* begin, const Slice* end) {
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false) {
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");

@ -291,7 +291,7 @@ class DBTest {
// TODO -- test more options
break;
case kDeletesFilterFirst:
options.deletes_check_filter_first = true;
options.filter_deletes = true;
break;
default:
break;
@ -772,39 +772,84 @@ TEST(DBTest, GetEncountersEmptyLevel) {
} while (ChangeOptions());
}
// KeyMayExist-API returns false if memtable(s) and in-memory bloom-filters can
// guarantee that the key doesn't exist in the db, else true. This can lead to
// a few false positives, but not false negatives. To make test deterministic,
// use a much larger number of bits per key-20 than bits in the key, so
// that false positives are eliminated
// KeyMayExist can lead to a few false positives, but not false negatives.
// To make test deterministic, use a much larger number of bits per key-20 than
// bits in the key, so that false positives are eliminated
TEST(DBTest, KeyMayExist) {
do {
ReadOptions ropts;
std::string value;
Options options = CurrentOptions();
options.filter_policy = NewBloomFilterPolicy(20);
Reopen(&options);
ASSERT_TRUE(!db_->KeyMayExist("a"));
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
ASSERT_OK(db_->Put(WriteOptions(), "a", "b"));
ASSERT_TRUE(db_->KeyMayExist("a"));
bool value_found = false;
ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found));
ASSERT_TRUE(value_found);
ASSERT_EQ("b", value);
dbfull()->Flush(FlushOptions());
ASSERT_TRUE(db_->KeyMayExist("a"));
value.clear();
value_found = false;
ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found));
ASSERT_TRUE(value_found);
ASSERT_EQ("b", value);
ASSERT_OK(db_->Delete(WriteOptions(), "a"));
ASSERT_TRUE(!db_->KeyMayExist("a"));
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
dbfull()->Flush(FlushOptions());
dbfull()->CompactRange(nullptr, nullptr);
ASSERT_TRUE(!db_->KeyMayExist("a"));
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
ASSERT_OK(db_->Delete(WriteOptions(), "c"));
ASSERT_TRUE(!db_->KeyMayExist("c"));
ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value));
delete options.filter_policy;
} while (ChangeOptions());
}
// A delete is skipped for key if KeyMayExist(key) returns False
// Tests Writebatch consistency and proper delete behaviour
TEST(DBTest, FilterDeletes) {
Options options = CurrentOptions();
options.filter_policy = NewBloomFilterPolicy(20);
options.filter_deletes = true;
Reopen(&options);
WriteBatch batch;
batch.Delete("a");
dbfull()->Write(WriteOptions(), &batch);
ASSERT_EQ(AllEntriesFor("a"), "[ ]"); // Delete skipped
batch.Clear();
batch.Put("a", "b");
batch.Delete("a");
dbfull()->Write(WriteOptions(), &batch);
ASSERT_EQ(Get("a"), "NOT_FOUND");
ASSERT_EQ(AllEntriesFor("a"), "[ DEL, b ]"); // Delete issued
batch.Clear();
batch.Delete("c");
batch.Put("c", "d");
dbfull()->Write(WriteOptions(), &batch);
ASSERT_EQ(Get("c"), "d");
ASSERT_EQ(AllEntriesFor("c"), "[ d ]"); // Delete skipped
batch.Clear();
dbfull()->Flush(FlushOptions()); // A stray Flush
batch.Delete("c");
dbfull()->Write(WriteOptions(), &batch);
ASSERT_EQ(AllEntriesFor("c"), "[ DEL, d ]"); // Delete issued
batch.Clear();
delete options.filter_policy;
}
TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions());
@ -3007,7 +3052,13 @@ class ModelDB: public DB {
Status::NotSupported("Not implemented."));
return s;
}
virtual bool KeyMayExist(const Slice& key) {
virtual bool KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found = nullptr) {
if (value_found != nullptr) {
*value_found = false;
}
return true; // Not Supported directly
}
virtual Iterator* NewIterator(const ReadOptions& options) {
@ -3058,7 +3109,8 @@ class ModelDB: public DB {
sizes[i] = 0;
}
}
virtual void CompactRange(const Slice* start, const Slice* end) {
virtual void CompactRange(const Slice* start, const Slice* end,
bool reduce_level ) {
}
virtual int NumberLevels()
@ -3191,6 +3243,9 @@ static bool CompareIterators(int step,
TEST(DBTest, Randomized) {
Random rnd(test::RandomSeed());
do {
if (CurrentOptions().filter_deletes) {
ChangeOptions(); // DBTest.Randomized not suited for filter_deletes
}
ModelDB model(CurrentOptions());
const int N = 10000;
const Snapshot* model_snap = nullptr;

@ -3,6 +3,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/memtable.h"
#include <memory>
#include "db/dbformat.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
@ -19,23 +22,28 @@ static Slice GetLengthPrefixedSlice(const char* data) {
return Slice(p, len);
}
MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel)
MemTable::MemTable(const InternalKeyComparator& cmp,
std::shared_ptr<MemTableRepFactory> table_factory,
int numlevel,
const Options& options)
: comparator_(cmp),
refs_(0),
table_(comparator_, &arena_),
arena_impl_(options.arena_block_size),
table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
edit_(numlevel),
first_seqno_(0),
mem_logfile_number_(0) {
}
mem_logfile_number_(0) { }
MemTable::~MemTable() {
assert(refs_ == 0);
}
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
size_t MemTable::ApproximateMemoryUsage() {
return arena_impl_.ApproximateMemoryUsage();
}
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
const {
@ -57,24 +65,27 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
virtual void Prev() { iter_.Prev(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
explicit MemTableIterator(MemTableRep* table)
: iter_(table->GetIterator()) { }
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_->SeekToFirst(); }
virtual void SeekToLast() { iter_->SeekToLast(); }
virtual void Next() { iter_->Next(); }
virtual void Prev() { iter_->Prev(); }
virtual Slice key() const {
return GetLengthPrefixedSlice(iter_->key());
}
virtual Slice value() const {
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::shared_ptr<MemTableRep::Iterator> iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
@ -83,7 +94,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
return new MemTableIterator(table_.get());
}
void MemTable::Add(SequenceNumber s, ValueType type,
@ -100,7 +111,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len);
char* buf = arena_impl_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
@ -109,7 +120,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == (unsigned)encoded_len);
table_.Insert(buf);
table_->Insert(buf);
// The first sequence number inserted into the memtable
assert(first_seqno_ == 0 || s > first_seqno_);
@ -119,10 +130,10 @@ void MemTable::Add(SequenceNumber s, ValueType type,
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
const Options& options, const bool check_presence_only) {
const Options& options) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
std::shared_ptr<MemTableRep::Iterator> iter(table_.get()->GetIterator());
iter->Seek(memkey.data());
bool merge_in_progress = false;
std::string operand;
@ -131,10 +142,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
merge_in_progress = true;
}
auto merge_operator = options.merge_operator;
auto logger = options.info_log;
for (; iter.Valid(); iter.Next()) {
for (; iter->Valid(); iter->Next()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
@ -144,7 +154,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
const char* entry = iter->key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
@ -164,10 +174,6 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
return true;
}
case kTypeMerge: {
if (check_presence_only) {
*s = Status::OK();
return true;
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
if (merge_in_progress) {
merge_operator->Merge(key.user_key(), &v, operand,

@ -6,24 +6,34 @@
#define STORAGE_LEVELDB_DB_MEMTABLE_H_
#include <string>
#include <memory>
#include "leveldb/db.h"
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "db/version_set.h"
#include "util/arena.h"
#include "leveldb/memtablerep.h"
#include "util/arena_impl.h"
namespace leveldb {
class InternalKeyComparator;
class Mutex;
class MemTableIterator;
class MemTable {
public:
struct KeyComparator : public MemTableRep::KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
virtual int operator()(const char* a, const char* b) const;
};
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
int numlevel = 7);
explicit MemTable(
const InternalKeyComparator& comparator,
std::shared_ptr<MemTableRepFactory> table_factory,
int numlevel = 7,
const Options& options = Options());
// Increase reference count.
void Ref() { ++refs_; }
@ -63,13 +73,12 @@ class MemTable {
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// If memtable contains Merge operation as the most recent entry for a key,
// and if check_presence_only is set, return true with Status::OK,
// else if the merge process does not stop (not reaching a value or delete),
// and the merge process does not stop (not reaching a value or delete),
// store the current merged result in value and MergeInProgress in s.
// return false
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s,
const Options& options, const bool check_presence_only = false);
const Options& options);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
@ -88,22 +97,14 @@ class MemTable {
private:
~MemTable(); // Private since only Unref() should be used to delete it
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { }
int operator()(const char* a, const char* b) const;
};
friend class MemTableIterator;
friend class MemTableBackwardIterator;
friend class MemTableList;
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
ArenaImpl arena_impl_;
shared_ptr<MemTableRep> table_;
// These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush

@ -194,10 +194,10 @@ size_t MemTableList::ApproximateMemoryUsage() {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s,
const Options& options, const bool check_presence_only) {
const Options& options) {
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end(); ++it) {
if ((*it)->Get(key, value, s, options, check_presence_only)) {
if ((*it)->Get(key, value, s, options)) {
return true;
}
}

@ -8,7 +8,6 @@
#include "leveldb/db.h"
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "util/arena.h"
#include "memtable.h"
namespace leveldb {
@ -71,7 +70,7 @@ class MemTableList {
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s,
const Options& options, const bool check_presence_only = false);
const Options& options);
// Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list);

@ -8,19 +8,29 @@
#include "leveldb/env.h"
#include "leveldb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "utilities/merge_operators.h"
#include "util/testharness.h"
#include "utilities/utility_db.h"
using namespace std;
using namespace leveldb;
auto mergeOperator = MergeOperators::CreateUInt64AddOperator();
std::shared_ptr<DB> OpenDb() {
std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
DB* db;
Options options;
options.create_if_missing = true;
options.merge_operator = mergeOperator.get();
Status s = DB::Open(options, "/tmp/testdb", &db);
Status s;
DestroyDB(dbname, Options());
if (ttl) {
cout << "Opening database with TTL\n";
s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl", &db);
} else {
s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db);
}
if (!s.ok()) {
cerr << s.ToString() << endl;
assert(false);
@ -45,7 +55,7 @@ class Counters {
uint64_t default_;
public:
Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
explicit Counters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
: db_(db),
put_option_(),
get_option_(),
@ -143,7 +153,7 @@ class MergeBasedCounters : public Counters {
WriteOptions merge_option_; // for merge
public:
MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
explicit MergeBasedCounters(std::shared_ptr<DB> db, uint64_t defaultCount = 0)
: Counters(db, defaultCount),
merge_option_() {
}
@ -227,9 +237,8 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
}
}
int main(int argc, char *argv[]) {
auto db = OpenDb();
void runTest(int argc, const string& dbname, const bool use_ttl = false) {
auto db = OpenDb(dbname, use_ttl);
{
cout << "Test read-modify-write counters... \n";
@ -249,5 +258,12 @@ int main(int argc, char *argv[]) {
testCounters(counters, db.get(), compact);
}
DestroyDB(dbname, Options());
}
int main(int argc, char *argv[]) {
//TODO: Make this test like a general rocksdb unit-test
runTest(argc, "/tmp/testdb");
runTest(argc, "/tmp/testdbttl", true); // Run test on TTL database
return 0;
}

@ -192,7 +192,8 @@ class Repairer {
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_.num_levels);
MemTable* mem = new MemTable(icmp_, options_.memtable_factory,
options_.num_levels);
mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {

@ -31,13 +31,10 @@
#include <assert.h>
#include <stdlib.h>
#include "port/port.h"
#include "util/arena.h"
#include "util/random.h"
namespace leveldb {
class Arena;
template<typename Key, class Comparator>
class SkipList {
private:

@ -5,7 +5,7 @@
#include "db/skiplist.h"
#include <set>
#include "leveldb/env.h"
#include "util/arena.h"
#include "util/arena_impl.h"
#include "util/hash.h"
#include "util/random.h"
#include "util/testharness.h"
@ -29,9 +29,9 @@ struct TestComparator {
class SkipTest { };
TEST(SkipTest, Empty) {
Arena arena;
ArenaImpl arena_impl;
TestComparator cmp;
SkipList<Key, TestComparator> list(cmp, &arena);
SkipList<Key, TestComparator> list(cmp, &arena_impl);
ASSERT_TRUE(!list.Contains(10));
SkipList<Key, TestComparator>::Iterator iter(&list);
@ -49,9 +49,9 @@ TEST(SkipTest, InsertAndLookup) {
const int R = 5000;
Random rnd(1000);
std::set<Key> keys;
Arena arena;
ArenaImpl arena_impl;
TestComparator cmp;
SkipList<Key, TestComparator> list(cmp, &arena);
SkipList<Key, TestComparator> list(cmp, &arena_impl);
for (int i = 0; i < N; i++) {
Key key = rnd.Next() % R;
if (keys.insert(key).second) {
@ -204,14 +204,14 @@ class ConcurrentTest {
// Current state of the test
State current_;
Arena arena_;
ArenaImpl arena_impl_;
// SkipList is not protected by mu_. We just use a single writer
// thread to modify it.
SkipList<Key, TestComparator> list_;
public:
ConcurrentTest() : list_(TestComparator(), &arena_) { }
ConcurrentTest() : list_(TestComparator(), &arena_impl_) { }
// REQUIRES: External synchronization
void WriteStep(Random* rnd) {

@ -0,0 +1,102 @@
#ifndef STORAGE_LEVELDB_DB_SKIPLISTREP_H_
#define STORAGE_LEVELDB_DB_SKIPLISTREP_H_
#include "leveldb/memtablerep.h"
#include "db/memtable.h"
#include "db/skiplist.h"
namespace leveldb {
class Arena;
class SkipListRep : public MemTableRep {
SkipList<const char*, MemTableRep::KeyComparator&> skip_list_;
public:
explicit SkipListRep(MemTableRep::KeyComparator& compare, Arena* arena)
: skip_list_(compare, arena) {
}
// Insert key into the list.
// REQUIRES: nothing that compares equal to key is currently in the list.
virtual void Insert(const char* key) {
skip_list_.Insert(key);
}
// Returns true iff an entry that compares equal to key is in the list.
virtual bool Contains(const char* key) const {
return skip_list_.Contains(key);
}
virtual ~SkipListRep() { }
// Iteration over the contents of a skip list
class Iterator : public MemTableRep::Iterator {
SkipList<const char*, MemTableRep::KeyComparator&>::Iterator iter_;
public:
// Initialize an iterator over the specified list.
// The returned iterator is not valid.
explicit Iterator(
const SkipList<const char*, MemTableRep::KeyComparator&>* list
) : iter_(list) { }
virtual ~Iterator() { }
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const {
return iter_.Valid();
}
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const {
return iter_.key();
}
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() {
iter_.Next();
}
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() {
iter_.Prev();
}
// Advance to the first entry with a key >= target
virtual void Seek(const char* target) {
iter_.Seek(target);
}
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
virtual void SeekToFirst() {
iter_.SeekToFirst();
}
// Position at the last entry in list.
// Final state of iterator is Valid() iff list is not empty.
virtual void SeekToLast() {
iter_.SeekToLast();
}
};
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() {
return std::shared_ptr<MemTableRep::Iterator>(
new SkipListRep::Iterator(&skip_list_)
);
}
};
class SkipListFactory : public MemTableRepFactory {
public:
virtual std::shared_ptr<MemTableRep> CreateMemTableRep (
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::shared_ptr<MemTableRep>(new SkipListRep(compare, arena));
}
};
}
#endif // STORAGE_LEVELDB_DB_SKIPLISTREP_H_

@ -39,15 +39,19 @@ TableCache::~TableCache() {
Status TableCache::FindTable(const EnvOptions& toptions,
uint64_t file_number, uint64_t file_size,
Cache::Handle** handle, bool* tableIO) {
Cache::Handle** handle, bool* table_io,
const bool no_io) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
if (tableIO != nullptr) {
*tableIO = true; // we had to do IO from storage
if (no_io) { // Dont do IO and return a not-found status
return Status::NotFound("Table not found in table_cache, no_io is set");
}
if (table_io != nullptr) {
*table_io = true; // we had to do IO from storage
}
std::string fname = TableFileName(dbname_, file_number);
unique_ptr<RandomAccessFile> file;
@ -112,17 +116,21 @@ Status TableCache::Get(const ReadOptions& options,
const Slice& k,
void* arg,
bool (*saver)(void*, const Slice&, const Slice&, bool),
bool* tableIO,
bool* table_io,
void (*mark_key_may_exist)(void*),
const bool no_IO) {
const bool no_io) {
Cache::Handle* handle = nullptr;
Status s = FindTable(storage_options_, file_number, file_size,
&handle, tableIO);
&handle, table_io, no_io);
if (s.ok()) {
Table* t =
reinterpret_cast<Table*>(cache_->Value(handle));
s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_IO);
s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_io);
cache_->Release(handle);
} else if (no_io && s.IsNotFound()) {
// Couldnt find Table in cache but treat as kFound if no_io set
(*mark_key_may_exist)(arg);
return Status::OK();
}
return s;
}

@ -48,9 +48,9 @@ class TableCache {
const Slice& k,
void* arg,
bool (*handle_result)(void*, const Slice&, const Slice&, bool),
bool* tableIO,
bool* table_io,
void (*mark_key_may_exist)(void*) = nullptr,
const bool no_IO = false);
const bool no_io = false);
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
@ -62,9 +62,9 @@ class TableCache {
const EnvOptions& storage_options_;
std::shared_ptr<Cache> cache_;
Status FindTable(const EnvOptions& toptions,
uint64_t file_number, uint64_t file_size, Cache::Handle**,
bool* tableIO = nullptr);
Status FindTable(const EnvOptions& toptions, uint64_t file_number,
uint64_t file_size, Cache::Handle**, bool* table_io=nullptr,
const bool no_io = false);
};
} // namespace leveldb

@ -239,6 +239,7 @@ struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value;
const MergeOperator* merge_operator;
Logger* logger;
@ -246,13 +247,17 @@ struct Saver {
};
}
// Called from TableCache::Get when bloom-filters can't guarantee that key does
// not exist and Get is not permitted to do IO to read the data-block and be
// certain.
// Set the key as Found and let the caller know that key-may-exist
// Called from TableCache::Get and InternalGet when file/block in which key may
// exist are not there in TableCache/BlockCache respectively. In this case we
// can't guarantee that key does not exist and are not permitted to do IO to be
// certain.Set the status=kFound and value_found=false to let the caller know
// that key may exist but is not there in memory
static void MarkKeyMayExist(void* arg) {
Saver* s = reinterpret_cast<Saver*>(arg);
s->state = kFound;
if (s->value_found != nullptr) {
*(s->value_found) = false;
}
}
static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
@ -348,7 +353,8 @@ void Version::Get(const ReadOptions& options,
Status *status,
GetStats* stats,
const Options& db_options,
const bool no_IO) {
const bool no_io,
bool* value_found) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
@ -357,13 +363,14 @@ void Version::Get(const ReadOptions& options,
auto logger = db_options.info_log;
assert(status->ok() || status->IsMergeInProgress());
if (no_IO) {
if (no_io) {
assert(status->ok());
}
Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = ucmp;
saver.user_key = user_key;
saver.value_found = value_found;
saver.value = value;
saver.merge_operator = merge_operator;
saver.logger = logger.get();
@ -432,7 +439,7 @@ void Version::Get(const ReadOptions& options,
bool tableIO = false;
*status = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue, &tableIO,
MarkKeyMayExist, no_IO);
MarkKeyMayExist, no_io);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;

@ -75,7 +75,7 @@ class Version {
};
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, GetStats* stats, const Options& db_option,
const bool no_IO = false);
const bool no_io = false, bool* value_found = nullptr);
// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.
@ -136,6 +136,7 @@ class Version {
private:
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&,

@ -16,9 +16,12 @@
#include "leveldb/write_batch.h"
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "leveldb/statistics.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/memtable.h"
#include "db/snapshot.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include <stdexcept>
@ -139,6 +142,23 @@ class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;
const Options* options_;
DBImpl* db_;
const bool filter_deletes_;
MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
DB* db, const bool filter_deletes)
: sequence_(sequence),
mem_(mem),
options_(opts),
db_(reinterpret_cast<DBImpl*>(db)),
filter_deletes_(filter_deletes) {
assert(mem_);
if (filter_deletes_) {
assert(options_);
assert(db_);
}
}
virtual void Put(const Slice& key, const Slice& value) {
mem_->Add(sequence_, kTypeValue, key, value);
@ -149,17 +169,28 @@ class MemTableInserter : public WriteBatch::Handler {
sequence_++;
}
virtual void Delete(const Slice& key) {
if (filter_deletes_) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
if (!db_->KeyMayExist(ropts, key, &value)) {
RecordTick(options_->statistics, NUMBER_FILTERED_DELETES);
return;
}
}
mem_->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
} // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
MemTable* memtable) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
const Options* opts, DB* db,
const bool filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db,
filter_deletes);
return b->Iterate(&inserter);
}

@ -7,6 +7,8 @@
#include "leveldb/types.h"
#include "leveldb/write_batch.h"
#include "leveldb/db.h"
#include "leveldb/options.h"
namespace leveldb {
@ -39,7 +41,12 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
// Inserts batch entries into memtable
// Drops deletes in batch if filter_del is set to true and
// db->KeyMayExist returns false
static Status InsertInto(const WriteBatch* batch, MemTable* memtable,
const Options* opts = nullptr, DB* db = nullptr,
const bool filter_del = false);
static void Append(WriteBatch* dst, const WriteBatch* src);
};

@ -4,6 +4,8 @@
#include "leveldb/db.h"
#include <memory>
#include "db/skiplistrep.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/env.h"
@ -14,7 +16,8 @@ namespace leveldb {
static std::string PrintContents(WriteBatch* b) {
InternalKeyComparator cmp(BytewiseComparator());
MemTable* mem = new MemTable(cmp);
auto factory = std::make_shared<SkipListFactory>();
MemTable* mem = new MemTable(cmp, factory);
mem->Ref();
std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem);

@ -0,0 +1,38 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Arena class defines memory allocation methods. It's used by memtable and
// skiplist.
#ifndef STORAGE_LEVELDB_INCLUDE_ARENA_H_
#define STORAGE_LEVELDB_INCLUDE_ARENA_H_
namespace leveldb {
class Arena {
public:
Arena() {};
virtual ~Arena() {};
// Return a pointer to a newly allocated memory block of "bytes" bytes.
virtual char* Allocate(size_t bytes) = 0;
// Allocate memory with the normal alignment guarantees provided by malloc.
virtual char* AllocateAligned(size_t bytes) = 0;
// Returns an estimate of the total memory used by arena.
virtual const size_t ApproximateMemoryUsage() = 0;
// Returns the total number of bytes in all blocks allocated so far.
virtual const size_t MemoryAllocatedBytes() = 0;
private:
// No copying allowed
Arena(const Arena&);
void operator=(const Arena&);
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_INCLUDE_ARENA_H_

@ -104,7 +104,8 @@ class DB {
//
// May return some other Status on an error.
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) = 0;
const Slice& key,
std::string* value) = 0;
// If keys[i] does not exist in the database, then the i'th returned
// status will be one for which Status::IsNotFound() is true, and
@ -121,9 +122,21 @@ class DB {
std::vector<std::string>* values) = 0;
// If the key definitely does not exist in the database, then this method
// returns false. Otherwise return true. This check is potentially
// lighter-weight than invoking DB::Get(). No IO is performed
virtual bool KeyMayExist(const Slice& key) = 0;
// returns false, else true. If the caller wants to obtain value when the key
// is found in memory, a bool for 'value_found' must be passed. 'value_found'
// will be true on return if value has been set properly.
// This check is potentially lighter-weight than invoking DB::Get(). One way
// to make this lighter weight is to avoid doing any IOs.
// Default implementation here returns true and sets 'value_found' to false
virtual bool KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found = nullptr) {
if (value_found != nullptr) {
*value_found = false;
}
return true;
}
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
@ -180,7 +193,14 @@ class DB {
// end==nullptr is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
// Note that after the entire database is compacted, all data are pushed
// down to the last level containing any data. If the total data size
// after compaction is reduced, that level might not be appropriate for
// hosting all the files. In this case, client could set reduce_level
// to true, to move the files back to the minimum level capable of holding
// the data set.
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false) = 0;
// Number of levels used for this DB.
virtual int NumberLevels() = 0;

@ -0,0 +1,88 @@
// This file contains the interface that must be implemented by any collection
// to be used as the backing store for a MemTable. Such a collection must
// satisfy the following properties:
// (1) It does not store duplicate items.
// (2) It uses MemTableRep::KeyComparator to compare items for iteration and
// equality.
// (3) It can be accessed concurrently by multiple readers but need not support
// concurrent writes.
// (4) Items are never deleted.
// The liberal use of assertions is encouraged to enforce (1).
#ifndef STORAGE_LEVELDB_DB_TABLE_H_
#define STORAGE_LEVELDB_DB_TABLE_H_
#include <memory>
#include "leveldb/arena.h"
namespace leveldb {
class MemTableRep {
public:
// KeyComparator(a, b) returns a negative value if a is less than b, 0 if they
// are equal, and a positive value if b is greater than a
class KeyComparator {
public:
virtual int operator()(const char* a, const char* b) const = 0;
virtual ~KeyComparator() { }
};
// Insert key into the collection. (The caller will pack key and value into a
// single buffer and pass that in as the parameter to Insert)
// REQUIRES: nothing that compares equal to key is currently in the
// collection.
virtual void Insert(const char* key) = 0;
// Returns true iff an entry that compares equal to key is in the collection.
virtual bool Contains(const char* key) const = 0;
virtual ~MemTableRep() { }
// Iteration over the contents of a skip collection
class Iterator {
public:
// Initialize an iterator over the specified collection.
// The returned iterator is not valid.
// explicit Iterator(const MemTableRep* collection);
virtual ~Iterator() { };
// Returns true iff the iterator is positioned at a valid node.
virtual bool Valid() const = 0;
// Returns the key at the current position.
// REQUIRES: Valid()
virtual const char* key() const = 0;
// Advances to the next position.
// REQUIRES: Valid()
virtual void Next() = 0;
// Advances to the previous position.
// REQUIRES: Valid()
virtual void Prev() = 0;
// Advance to the first entry with a key >= target
virtual void Seek(const char* target) = 0;
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() = 0;
// Position at the last entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToLast() = 0;
};
virtual std::shared_ptr<Iterator> GetIterator() = 0;
};
class MemTableRepFactory {
public:
virtual ~MemTableRepFactory() { };
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena* arena) = 0;
};
}
#endif // STORAGE_LEVELDB_DB_TABLE_H_

@ -13,6 +13,7 @@
#include "leveldb/slice.h"
#include "leveldb/statistics.h"
#include "leveldb/universal_compaction.h"
#include "leveldb/memtablerep.h"
namespace leveldb {
@ -224,9 +225,9 @@ struct Options {
// level-0 compaction will not be triggered by number of files at all.
int level0_file_num_compaction_trigger;
// Soft limit on number of level-0 files. We slow down writes at this point.
// A value <0 means that no writing slow down will be triggered by number
// of files in level-0.
// Soft limit on number of level-0 files. We start slowing down writes at this
// point. A value <0 means that no writing slow down will be triggered by
// number of files in level-0.
int level0_slowdown_writes_trigger;
// Maximum number of level-0 files. We stop writes at this point.
@ -380,6 +381,13 @@ struct Options {
// Number of shards used for table cache.
int table_cache_numshardbits;
// size of one block in arena memory allocation.
// If <= 0, a proper value is automatically calculated (usually 1/10 of
// writer_buffer_size).
//
// Default: 0
size_t arena_block_size;
// Create an Options object with default values for all fields.
Options();
@ -477,14 +485,17 @@ struct Options {
// The options needed to support Universal Style compactions
CompactionOptionsUniversal compaction_options_universal;
// Use bloom-filter for deletes when this is true.
// db->Delete first calls KeyMayExist which checks memtable,immutable-memtable
// and bloom-filters to determine if the key does not exist in the database.
// If the key definitely does not exist, then the delete is a noop.KeyMayExist
// only incurs in-memory look up. This optimization avoids writing the delete
// to storage when appropriate.
// Use KeyMayExist API to filter deletes when this is true.
// If KeyMayExist returns false, i.e. the key definitely does not exist, then
// the delete is a noop. KeyMayExist only incurs in-memory look up.
// This optimization avoids writing the delete to storage when appropriate.
// Default: false
bool deletes_check_filter_first;
bool filter_deletes;
// This is a factory that provides MemTableRep objects.
// Default: a factory that provides a skip-list-based implementation of
// MemTableRep.
std::shared_ptr<MemTableRepFactory> memtable_factory;
};
// Options that control read operations

@ -80,7 +80,7 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{ STALL_L0_SLOWDOWN_MICROS, "rocksdb.l0.slowdown.micros" },
{ STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros" },
{ STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros" },
{ RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.dleay.millis" },
{ RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis" },
{ NO_ITERATORS, "rocksdb.num.iterators" },
{ NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get" },
{ NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read" },
@ -109,8 +109,13 @@ enum Histograms {
READ_BLOCK_COMPACTION_MICROS = 9,
READ_BLOCK_GET_MICROS = 10,
WRITE_RAW_BLOCK_MICROS = 11,
NUM_FILES_IN_SINGLE_COMPACTION = 12,
HISTOGRAM_ENUM_MAX = 13
STALL_L0_SLOWDOWN_COUNT = 12,
STALL_MEMTABLE_COMPACTION_COUNT = 13,
STALL_L0_NUM_FILES_COUNT = 14,
RATE_LIMIT_DELAY_COUNT = 15,
NUM_FILES_IN_SINGLE_COMPACTION = 16,
HISTOGRAM_ENUM_MAX = 17
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
@ -126,6 +131,10 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{ READ_BLOCK_COMPACTION_MICROS, "rocksdb.read.block.compaction.micros" },
{ READ_BLOCK_GET_MICROS, "rocksdb.read.block.get.micros" },
{ WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros" },
{ STALL_L0_SLOWDOWN_COUNT, "rocksdb.l0.slowdown.count"},
{ STALL_MEMTABLE_COMPACTION_COUNT, "rocksdb.memtable.compaction.count"},
{ STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"},
{ RATE_LIMIT_DELAY_COUNT, "rocksdb.rate.limit.delay.count"},
{ NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" }
};

@ -235,7 +235,8 @@ Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value,
bool* didIO,
bool for_compaction) {
bool for_compaction,
const bool no_io) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache.get();
std::shared_ptr<Statistics> statistics = table->rep_->options.statistics;
@ -264,6 +265,8 @@ Iterator* Table::BlockReader(void* arg,
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
RecordTick(statistics, BLOCK_CACHE_HIT);
} else if (no_io) {
return nullptr; // Did not find in block_cache and can't do IO
} else {
Histograms histogram = for_compaction ?
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
@ -286,7 +289,9 @@ Iterator* Table::BlockReader(void* arg,
RecordTick(statistics, BLOCK_CACHE_MISS);
}
} else {
} else if (no_io) {
return nullptr; // Could not read from block_cache and can't do IO
}else {
s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO);
}
}
@ -324,7 +329,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
bool (*saver)(void*, const Slice&, const Slice&,
bool),
void (*mark_key_may_exist)(void*),
const bool no_IO) {
const bool no_io) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
bool done = false;
@ -340,16 +345,17 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
// cross one data block, we should be fine.
RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL);
break;
} else if (no_IO) {
// Update Saver.state to Found because we are only looking for whether
// bloom-filter can guarantee the key is not there when "no_IO"
(*mark_key_may_exist)(arg);
done = true;
} else {
bool didIO = false;
Iterator* block_iter = BlockReader(this, options, iiter->value(),
&didIO);
&didIO, no_io);
if (no_io && !block_iter) { // couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for whether
// we can guarantee the key is not there when "no_io" is set
(*mark_key_may_exist)(arg);
break;
}
for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) {
if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) {
done = true;

@ -77,7 +77,8 @@ class Table {
const EnvOptions& soptions, const Slice&,
bool for_compaction);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO, bool for_compaction = false);
bool* didIO, bool for_compaction = false,
const bool no_io = false);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
@ -88,7 +89,7 @@ class Table {
void* arg,
bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool),
void (*mark_key_may_exist)(void*) = nullptr,
const bool no_IO = false);
const bool no_io = false);
void ReadMeta(const Footer& footer);

@ -3,8 +3,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <map>
#include <string>
#include <memory>
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/skiplistrep.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
@ -342,8 +344,9 @@ class MemTableConstructor: public Constructor {
public:
explicit MemTableConstructor(const Comparator* cmp)
: Constructor(cmp),
internal_comparator_(cmp) {
memtable_ = new MemTable(internal_comparator_);
internal_comparator_(cmp),
table_factory_(new SkipListFactory) {
memtable_ = new MemTable(internal_comparator_, table_factory_);
memtable_->Ref();
}
~MemTableConstructor() {
@ -351,7 +354,7 @@ class MemTableConstructor: public Constructor {
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
memtable_->Unref();
memtable_ = new MemTable(internal_comparator_);
memtable_ = new MemTable(internal_comparator_, table_factory_);
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
@ -369,6 +372,7 @@ class MemTableConstructor: public Constructor {
private:
InternalKeyComparator internal_comparator_;
MemTable* memtable_;
std::shared_ptr<SkipListFactory> table_factory_;
};
class DBConstructor: public Constructor {
@ -805,7 +809,8 @@ class MemTableTest { };
TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator());
MemTable* memtable = new MemTable(cmp);
auto table_factory = std::make_shared<SkipListFactory>();
MemTable* memtable = new MemTable(cmp, table_factory);
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);

@ -79,7 +79,8 @@ def main(argv):
' --target_file_size_multiplier=2 ' + \
' --max_write_buffer_number=3 ' + \
' --max_background_compactions=20 ' + \
' --max_bytes_for_level_base=10485760'
' --max_bytes_for_level_base=10485760 ' + \
' --filter_deletes=' + str(random.randint(0, 1))
killtime = time.time() + interval
child = subprocess.Popen(['./db_stress \
--test_batches_snapshots=1 \

@ -84,7 +84,8 @@ def main(argv):
' --target_file_size_multiplier=2 ' + \
' --max_write_buffer_number=3 ' + \
' --max_background_compactions=20 ' + \
' --max_bytes_for_level_base=10485760'
' --max_bytes_for_level_base=10485760 ' + \
' --filter_deletes=' + str(random.randint(0, 1))
print ("Running db_stress with additional options=\n"
+ additional_opts + "\n")

@ -183,8 +183,8 @@ static uint32_t FLAGS_log2_keys_per_lock = 2; // implies 2^2 keys per lock
// Percentage of times we want to purge redundant keys in memory before flushing
static uint32_t FLAGS_purge_redundant_percent = 50;
// On true, deletes use bloom-filter and drop the delete if key not present
static bool FLAGS_deletes_check_filter_first = false;
// On true, deletes use KeyMayExist to drop the delete if key not present
static bool FLAGS_filter_deletes = false;
// Level0 compaction start trigger
static int FLAGS_level0_file_num_compaction_trigger = 0;
@ -907,7 +907,7 @@ class StressTest {
fprintf(stdout, "Purge redundant %% : %d\n",
FLAGS_purge_redundant_percent);
fprintf(stdout, "Deletes use filter : %d\n",
FLAGS_deletes_check_filter_first);
FLAGS_filter_deletes);
fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock);
@ -964,7 +964,7 @@ class StressTest {
options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros;
options.max_manifest_file_size = 1024;
options.deletes_check_filter_first = FLAGS_deletes_check_filter_first;
options.filter_deletes = FLAGS_filter_deletes;
static Random purge_percent(1000); // no benefit from non-determinism here
if (purge_percent.Uniform(100) < FLAGS_purge_redundant_percent - 1) {
options.purge_redundant_kvs_while_flush = false;
@ -1168,9 +1168,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--purge_redundant_percent=%d%c", &n, &junk) == 1
&& (n >= 0 && n <= 100)) {
FLAGS_purge_redundant_percent = n;
} else if (sscanf(argv[i], "--deletes_check_filter_first=%d%c", &n, &junk)
} else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk)
== 1 && (n == 0 || n == 1)) {
FLAGS_deletes_check_filter_first = n;
FLAGS_filter_deletes = n;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);

@ -2,27 +2,32 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/arena.h"
#include <assert.h>
#include "util/arena_impl.h"
namespace leveldb {
static const int kBlockSize = 4096;
ArenaImpl::ArenaImpl(size_t block_size) {
if (block_size < kMinBlockSize) {
block_size_ = kMinBlockSize;
} else if (block_size > kMaxBlockSize) {
block_size_ = kMaxBlockSize;
} else {
block_size_ = block_size;
}
Arena::Arena() {
blocks_memory_ = 0;
alloc_ptr_ = nullptr; // First allocation will allocate a block
alloc_bytes_remaining_ = 0;
}
Arena::~Arena() {
ArenaImpl::~ArenaImpl() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
}
}
char* Arena::AllocateFallback(size_t bytes) {
if (bytes > kBlockSize / 4) {
char* ArenaImpl::AllocateFallback(size_t bytes) {
if (bytes > block_size_ / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
@ -30,8 +35,8 @@ char* Arena::AllocateFallback(size_t bytes) {
}
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;
alloc_ptr_ = AllocateNewBlock(block_size_);
alloc_bytes_remaining_ = block_size_;
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
@ -39,7 +44,7 @@ char* Arena::AllocateFallback(size_t bytes) {
return result;
}
char* Arena::AllocateAligned(size_t bytes) {
char* ArenaImpl::AllocateAligned(size_t bytes) {
const int align = sizeof(void*); // We'll align to pointer size
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
@ -58,7 +63,7 @@ char* Arena::AllocateAligned(size_t bytes) {
return result;
}
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* ArenaImpl::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_memory_ += block_bytes;
blocks_.push_back(result);

@ -2,38 +2,53 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_
#define STORAGE_LEVELDB_UTIL_ARENA_H_
// ArenaImpl is an implementation of Arena class. For a request of small size,
// it allocates a block with pre-defined block size. For a request of big
// size, it uses malloc to directly get the requested size.
#ifndef STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_
#define STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_
#include <cstddef>
#include <vector>
#include <assert.h>
#include <stdint.h>
#include "leveldb/arena.h"
namespace leveldb {
class Arena {
class ArenaImpl : public Arena {
public:
Arena();
~Arena();
explicit ArenaImpl(size_t block_size = kMinBlockSize);
virtual ~ArenaImpl();
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);
virtual char* Allocate(size_t bytes);
// Allocate memory with the normal alignment guarantees provided by malloc
char* AllocateAligned(size_t bytes);
virtual char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
// by the arena (including space allocated but not yet used for user
// allocations).
size_t MemoryUsage() const {
//
// TODO: Do we need to exclude space allocated but not used?
virtual const size_t ApproximateMemoryUsage() {
return blocks_memory_ + blocks_.capacity() * sizeof(char*);
}
virtual const size_t MemoryAllocatedBytes() {
return blocks_memory_;
}
private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
static const size_t kMinBlockSize = 4096;
static const size_t kMaxBlockSize = 2 << 30;
// Number of bytes allocated in one block
size_t block_size_;
// Allocation state
char* alloc_ptr_;
size_t alloc_bytes_remaining_;
@ -45,11 +60,11 @@ class Arena {
size_t blocks_memory_;
// No copying allowed
Arena(const Arena&);
void operator=(const Arena&);
ArenaImpl(const ArenaImpl&);
void operator=(const ArenaImpl&);
};
inline char* Arena::Allocate(size_t bytes) {
inline char* ArenaImpl::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
@ -65,4 +80,4 @@ inline char* Arena::Allocate(size_t bytes) {
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_ARENA_H_
#endif // STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_

@ -2,22 +2,59 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/arena.h"
#include "util/arena_impl.h"
#include "util/random.h"
#include "util/testharness.h"
namespace leveldb {
class ArenaTest { };
class ArenaImplTest { };
TEST(ArenaImplTest, Empty) {
ArenaImpl arena0;
}
TEST(ArenaImplTest, MemoryAllocatedBytes) {
const int N = 17;
size_t req_sz; //requested size
size_t bsz = 8192; // block size
size_t expected_memory_allocated;
TEST(ArenaTest, Empty) {
Arena arena;
ArenaImpl arena_impl(bsz);
// requested size > quarter of a block:
// allocate requested size separately
req_sz = 3001;
for (int i = 0; i < N; i++) {
arena_impl.Allocate(req_sz);
}
expected_memory_allocated = req_sz * N;
ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated);
// requested size < quarter of a block:
// allocate a block with the default size, then try to use unused part
// of the block. So one new block will be allocated for the first
// Allocate(99) call. All the remaining calls won't lead to new allocation.
req_sz = 99;
for (int i = 0; i < N; i++) {
arena_impl.Allocate(req_sz);
}
expected_memory_allocated += bsz;
ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated);
// requested size > quarter of a block:
// allocate requested size separately
req_sz = 99999999;
for (int i = 0; i < N; i++) {
arena_impl.Allocate(req_sz);
}
expected_memory_allocated += req_sz * N;
ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated);
}
TEST(ArenaTest, Simple) {
TEST(ArenaImplTest, Simple) {
std::vector<std::pair<size_t, char*> > allocated;
Arena arena;
ArenaImpl arena_impl;
const int N = 100000;
size_t bytes = 0;
Random rnd(301);
@ -35,9 +72,9 @@ TEST(ArenaTest, Simple) {
}
char* r;
if (rnd.OneIn(10)) {
r = arena.AllocateAligned(s);
r = arena_impl.AllocateAligned(s);
} else {
r = arena.Allocate(s);
r = arena_impl.Allocate(s);
}
for (unsigned int b = 0; b < s; b++) {
@ -46,9 +83,9 @@ TEST(ArenaTest, Simple) {
}
bytes += s;
allocated.push_back(std::make_pair(s, r));
ASSERT_GE(arena.MemoryUsage(), bytes);
ASSERT_GE(arena_impl.ApproximateMemoryUsage(), bytes);
if (i > N/10) {
ASSERT_LE(arena.MemoryUsage(), bytes * 1.10);
ASSERT_LE(arena_impl.ApproximateMemoryUsage(), bytes * 1.10);
}
}
for (unsigned int i = 0; i < allocated.size(); i++) {

@ -12,6 +12,7 @@
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/merge_operator.h"
#include "db/skiplistrep.h"
namespace leveldb {
@ -60,6 +61,7 @@ Options::Options()
max_manifest_file_size(std::numeric_limits<uint64_t>::max()),
no_block_cache(false),
table_cache_numshardbits(4),
arena_block_size(0),
disable_auto_compactions(false),
WAL_ttl_seconds(0),
manifest_preallocation_size(4 * 1024 * 1024),
@ -76,7 +78,9 @@ Options::Options()
use_adaptive_mutex(false),
bytes_per_sync(0),
compaction_style(kCompactionStyleLevel),
deletes_check_filter_first(false) {
filter_deletes(false),
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)) {
assert(memtable_factory.get() != nullptr);
}
static const char* const access_hints[] = {
@ -172,6 +176,8 @@ Options::Dump(Logger* log) const
no_block_cache);
Log(log," Options.table_cache_numshardbits: %d",
table_cache_numshardbits);
Log(log," Options.arena_block_size: %ld",
arena_block_size);
Log(log," Options.delete_obsolete_files_period_micros: %ld",
delete_obsolete_files_period_micros);
Log(log," Options.max_background_compactions: %d",
@ -210,10 +216,10 @@ Options::Dump(Logger* log) const
use_adaptive_mutex);
Log(log," Options.bytes_per_sync: %ld",
bytes_per_sync);
Log(log," Options.filter_deletes: %d",
filter_deletes);
Log(log," Options.compaction_style: %d",
compaction_style);
Log(log," Options.deletes_check_filter_first: %d",
deletes_check_filter_first);
Log(log," Options.compaction_options_universal.size_ratio: %d",
compaction_options_universal.size_ratio);
Log(log," Options.compaction_options_universal.min_merge_width: %d",

@ -21,6 +21,10 @@ DBWithTTL::DBWithTTL(const int32_t ttl,
assert(options.compaction_filter == nullptr);
Options options_to_open = options;
options_to_open.compaction_filter = this;
if (options.merge_operator) {
ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator));
options_to_open.merge_operator = ttl_merge_op_.get();
}
if (read_only) {
st = DB::OpenForReadOnly(options_to_open, dbname, &db_);
} else {
@ -125,15 +129,12 @@ Status DBWithTTL::StripTS(std::string* str) {
}
Status DBWithTTL::Put(
const WriteOptions& o,
const WriteOptions& opt,
const Slice& key,
const Slice& val) {
std::string value_with_ts;
Status st = AppendTS(val, value_with_ts);
if (!st.ok()) {
return st;
}
return db_->Put(o, key, value_with_ts);
WriteBatch batch;
batch.Put(key, val);
return Write(opt, &batch);
}
Status DBWithTTL::Get(const ReadOptions& options,
@ -158,18 +159,23 @@ std::vector<Status> DBWithTTL::MultiGet(const ReadOptions& options,
supported with TTL"));
}
bool DBWithTTL::KeyMayExist(const Slice& key) {
return db_->KeyMayExist(key);
bool DBWithTTL::KeyMayExist(ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found) {
return db_->KeyMayExist(options, key, value, value_found);
}
Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) {
return db_->Delete(wopts, key);
}
Status DBWithTTL::Merge(const WriteOptions& options,
Status DBWithTTL::Merge(const WriteOptions& opt,
const Slice& key,
const Slice& value) {
return Status::NotSupported("Merge operation not supported.");
WriteBatch batch;
batch.Merge(key, value);
return Write(opt, &batch);
}
Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
@ -187,8 +193,13 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
}
}
virtual void Merge(const Slice& key, const Slice& value) {
// TTL doesn't support merge operation
batch_rewrite_status = Status::NotSupported("TTL doesn't support Merge");
std::string value_with_ts;
Status st = AppendTS(value, value_with_ts);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
updates_ttl.Merge(key, value_with_ts);
}
}
virtual void Delete(const Slice& key) {
updates_ttl.Delete(key);
@ -223,8 +234,9 @@ void DBWithTTL::GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
db_->GetApproximateSizes(r, n, sizes);
}
void DBWithTTL::CompactRange(const Slice* begin, const Slice* end) {
db_->CompactRange(begin, end);
void DBWithTTL::CompactRange(const Slice* begin, const Slice* end,
bool reduce_level) {
db_->CompactRange(begin, end, reduce_level);
}
int DBWithTTL::NumberLevels() {

@ -5,8 +5,10 @@
#ifndef LEVELDB_UTILITIES_TTL_DB_TTL_H_
#define LEVELDB_UTILITIES_TTL_DB_TTL_H_
#include "include/leveldb/db.h"
#include "include/leveldb/compaction_filter.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/compaction_filter.h"
#include "leveldb/merge_operator.h"
#include "db/db_impl.h"
namespace leveldb {
@ -33,7 +35,10 @@ class DBWithTTL : public DB, CompactionFilter {
const std::vector<Slice>& keys,
std::vector<std::string>* values);
virtual bool KeyMayExist(const Slice& key);
virtual bool KeyMayExist(ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found = nullptr);
virtual Status Delete(const WriteOptions& wopts, const Slice& key);
@ -54,7 +59,8 @@ class DBWithTTL : public DB, CompactionFilter {
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end);
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false);
virtual int NumberLevels();
@ -106,6 +112,7 @@ class DBWithTTL : public DB, CompactionFilter {
private:
DB* db_;
int32_t ttl_;
unique_ptr<MergeOperator> ttl_merge_op_;
};
class TtlIterator : public Iterator {
@ -169,5 +176,56 @@ class TtlIterator : public Iterator {
Iterator* iter_;
};
class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const MergeOperator* merge_op)
: user_merge_op_(merge_op) {
assert(merge_op);
}
virtual void Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const {
const uint32_t& ts_len = DBWithTTL::kTSLength;
if ((existing_value && existing_value->size() < ts_len) ||
value.size() < ts_len) {
Log(logger, "Error: Could not remove timestamp correctly from value.");
assert(false);
//TODO: Remove assert and make this function return false.
//TODO: Change Merge semantics and add a counter here
}
Slice value_without_ts(value.data(), value.size() - ts_len);
if (existing_value) {
Slice existing_value_without_ts(existing_value->data(),
existing_value->size() - ts_len);
user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts,
new_value, logger);
} else {
user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger);
}
int32_t curtime;
if (!DBWithTTL::GetCurrentTime(curtime).ok()) {
Log(logger, "Error: Could not get current time to be attached internally "
"to the new value.");
assert(false);
//TODO: Remove assert and make this function return false.
} else {
char ts_string[ts_len];
EncodeFixed32(ts_string, curtime);
new_value->append(ts_string, ts_len);
}
}
virtual const char* Name() const {
return "Merge By TTL";
}
private:
const MergeOperator* user_merge_op_;
};
}
#endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_

Loading…
Cancel
Save