diff --git a/Makefile b/Makefile index 9b1d05fcf..765e52b52 100644 --- a/Makefile +++ b/Makefile @@ -240,8 +240,8 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) -merge_test: db/merge_test.o $(LIBOBJECTS) - $(CXX) db/merge_test.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(MEMENVLIBRARY) : $(MEMENVOBJECTS) rm -f $@ diff --git a/README b/README index 4ac8f93cb..4d605f5bc 100644 --- a/README +++ b/README @@ -1,5 +1,7 @@ rocksdb: A persistent key-value store for flash storage -Authors: The Facebook Database Engineering Team +Authors: * The Facebook Database Engineering Team + * Build on earlier work on leveldb by Sanjay Ghemawat + (sanjay@google.com) and Jeff Dean (jeff@google.com) This code is a library that forms the core building block for a fast key value server, especially suited for storing data on flash drives. @@ -56,6 +58,25 @@ include/env.h Abstraction of the OS environment. A posix implementation of this interface is in util/env_posix.cc -include/table.h include/table_builder.h Lower-level modules that most clients probably won't use directly + +include/cache.h + An API for the block cache. + +include/compaction_filter.h + An API for a application filter invoked on every compaction. + +include/filter_policy.h + An API for configuring a bloom filter. + +include/memtablerep.h + An API for implementing a memtable. + +include/statistics.h + An API to retrieve various database statistics. + +include/transaction_log_iterator.h + An API to retrieve transaction logs from a database. + + diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 7d9b4b068..4ed04c0d1 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -57,6 +57,7 @@ class CorruptionTest { opt.env = &env_; opt.block_cache = tiny_cache_; opt.block_size_deviation = 0; + opt.arena_block_size = 4096; return DB::Open(opt, dbname_, &db_); } diff --git a/db/db_bench.cc b/db/db_bench.cc index 1f9dbd1b4..db73367ba 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -338,7 +338,7 @@ static auto FLAGS_bytes_per_sync = leveldb::Options().bytes_per_sync; // On true, deletes use bloom-filter and drop the delete if key not present -static bool FLAGS_deletes_check_filter_first = false; +static bool FLAGS_filter_deletes = false; namespace leveldb { @@ -1128,7 +1128,7 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; - options.deletes_check_filter_first = FLAGS_deletes_check_filter_first; + options.filter_deletes = FLAGS_filter_deletes; if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) { if (FLAGS_max_bytes_for_level_multiplier_additional.size() != (unsigned int)FLAGS_num_levels) { @@ -2246,9 +2246,9 @@ int main(int argc, char** argv) { FLAGS_keys_per_multiget = n; } else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) { FLAGS_bytes_per_sync = l; - } else if (sscanf(argv[i], "--deletes_check_filter_first=%d%c", &n, &junk) + } else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk) == 1 && (n == 0 || n ==1 )) { - FLAGS_deletes_check_filter_first = n; + FLAGS_filter_deletes = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 5cf3a28b9..29d65d15b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -129,6 +129,12 @@ Options SanitizeOptions(const std::string& dbname, ((size_t)64)<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); + // if user sets arena_block_size, we trust user to use this value. Otherwise, + // calculate a proper value from writer_buffer_size; + if (result.arena_block_size <= 0) { + result.arena_block_size = result.write_buffer_size / 10; + } + result.min_write_buffer_number_to_merge = std::min( result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1); if (result.info_log == nullptr) { @@ -164,7 +170,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_(new MemTable(internal_comparator_, NumberLevels())), + mem_rep_factory_(options_.memtable_factory), + mem_(new MemTable(internal_comparator_, mem_rep_factory_, + NumberLevels(), options_)), logfile_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), @@ -178,20 +186,28 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stall_level0_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), + stall_level0_slowdown_count_(0), + stall_memtable_compaction_count_(0), + stall_level0_num_files_count_(0), started_at_(options.env->NowMicros()), flush_on_destroy_(false), stats_(options.num_levels), delayed_writes_(0), last_flushed_sequence_(0), - storage_options_(options) { + storage_options_(options), + bg_work_gate_closed_(false), + refitting_level_(false) { mem_->Ref(); env_->GetAbsolutePath(dbname, &db_absolute_path_); stall_leveln_slowdown_.resize(options.num_levels); - for (int i = 0; i < options.num_levels; ++i) + stall_leveln_slowdown_count_.resize(options.num_levels); + for (int i = 0; i < options.num_levels; ++i) { stall_leveln_slowdown_[i] = 0; + stall_leveln_slowdown_count_[i] = 0; + } // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options_.max_open_files - 10; @@ -687,10 +703,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { - mem = new MemTable(internal_comparator_, NumberLevels()); + mem = new MemTable(internal_comparator_, mem_rep_factory_, + NumberLevels(), options_); mem->Ref(); } - status = WriteBatchInternal::InsertInto(&batch, mem); + status = WriteBatchInternal::InsertInto(&batch, mem, &options_); MaybeIgnoreError(&status); if (!status.ok()) { break; @@ -904,7 +921,8 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { return s; } -void DBImpl::CompactRange(const Slice* begin, const Slice* end) { +void DBImpl::CompactRange(const Slice* begin, const Slice* end, + bool reduce_level) { int max_level_with_files = 1; { MutexLock l(&mutex_); @@ -919,6 +937,79 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } + + if (reduce_level) { + ReFitLevel(max_level_with_files); + } +} + +// return the same level if it cannot be moved +int DBImpl::FindMinimumEmptyLevelFitting(int level) { + mutex_.AssertHeld(); + int minimum_level = level; + for (int i = level - 1; i > 0; --i) { + // stop if level i is not empty + if (versions_->NumLevelFiles(i) > 0) break; + + // stop if level i is too small (cannot fit the level files) + if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break; + + minimum_level = i; + } + return minimum_level; +} + +void DBImpl::ReFitLevel(int level) { + assert(level < NumberLevels()); + + MutexLock l(&mutex_); + + // only allow one thread refitting + if (refitting_level_) { + Log(options_.info_log, "ReFitLevel: another thread is refitting"); + return; + } + refitting_level_ = true; + + // wait for all background threads to stop + bg_work_gate_closed_ = true; + while (bg_compaction_scheduled_ > 0) { + Log(options_.info_log, + "RefitLevel: waiting for background threads to stop: %d", + bg_compaction_scheduled_); + bg_cv_.Wait(); + } + + // move to a smaller level + int to_level = FindMinimumEmptyLevelFitting(level); + + assert(to_level <= level); + + if (to_level < level) { + Log(options_.info_log, "Before refitting:\n%s", + versions_->current()->DebugString().data()); + + VersionEdit edit(NumberLevels()); + for (const auto& f : versions_->current()->files_[level]) { + edit.DeleteFile(level, f->number); + edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); + } + Log(options_.info_log, "Apply version edit:\n%s", + edit.DebugString().data()); + + auto status = versions_->LogAndApply(&edit, &mutex_); + + Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); + + if (status.ok()) { + Log(options_.info_log, "After refitting:\n%s", + versions_->current()->DebugString().data()); + } + } + + refitting_level_ = false; + bg_work_gate_closed_ = false; } int DBImpl::NumberLevels() { @@ -1242,7 +1333,9 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_ >= options_.max_background_compactions) { + if (bg_work_gate_closed_) { + // gate closed for backgrond work + } else if (bg_compaction_scheduled_ >= options_.max_background_compactions) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions @@ -2019,13 +2112,11 @@ Status DBImpl::Get(const ReadOptions& options, return GetImpl(options, key, value); } -// If no_IO is true, then returns Status::NotFound if key is not in memtable, -// immutable-memtable and bloom-filters can guarantee that key is not in db, -// "value" is garbage string if no_IO is true Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_IO) { + const bool no_io, + bool* value_found) { Status s; StopWatch sw(env_, options_.statistics, DB_GET); @@ -2054,12 +2145,12 @@ Status DBImpl::GetImpl(const ReadOptions& options, // s is both in/out. When in, s could either be OK or MergeInProgress. // value will contain the current merge operand in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_, no_IO)) { + if (mem->Get(lkey, value, &s, options_)) { // Done - } else if (imm.Get(lkey, value, &s, options_, no_IO)) { + } else if (imm.Get(lkey, value, &s, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_, no_IO); + current->Get(options, lkey, value, &s, &stats, options_, no_io,value_found); have_stat_update = true; } mutex_.Lock(); @@ -2149,10 +2240,14 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, return statList; } -bool DBImpl::KeyMayExist(const Slice& key) { - std::string value; - const Status s = GetImpl(ReadOptions(), key, &value, true); - return !s.IsNotFound(); +bool DBImpl::KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found) { + if (value_found != nullptr) { + *value_found = true; // falsify later if key-may-exist but can't fetch value + } + return GetImpl(options, key, value, true, value_found).ok(); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { @@ -2190,10 +2285,6 @@ Status DBImpl::Merge(const WriteOptions& o, const Slice& key, } Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { - if (options_.deletes_check_filter_first && !KeyMayExist(key)) { - RecordTick(options_.statistics, NUMBER_FILTERED_DELETES); - return Status::OK(); - } return DB::Delete(options, key); } @@ -2252,7 +2343,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_); + status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this, + options_.filter_deletes); if (!status.ok()) { // Panic for in-memory corruptions // Note that existing logic was not sound. Any partial failure writing @@ -2341,6 +2433,40 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { return result; } +// This function computes the amount of time in microseconds by which a write +// should be delayed based on the number of level-0 files according to the +// following formula: +// if num_level_files < level0_slowdown_writes_trigger, return 0; +// if num_level_files >= level0_stop_writes_trigger, return 1000; +// otherwise, let r = (num_level_files - level0_slowdown) / +// (level0_stop - level0_slowdown) +// and return r^2 * 1000. +// The goal of this formula is to gradually increase the rate at which writes +// are slowed. We also tried linear delay (r * 1000), but it seemed to do +// slightly worse. There is no other particular reason for choosing quadratic. +uint64_t DBImpl::SlowdownAmount(int num_level0_files) { + uint64_t delay; + int stop_trigger = options_.level0_stop_writes_trigger; + int slowdown_trigger = options_.level0_slowdown_writes_trigger; + if (num_level0_files >= stop_trigger) { + delay = 1000; + } + else if (num_level0_files < slowdown_trigger) { + delay = 0; + } + else { + // If we are here, we know that: + // slowdown_trigger <= num_level0_files < stop_trigger + // since the previous two conditions are false. + float how_much = + (float) (num_level0_files - slowdown_trigger) / + (stop_trigger - slowdown_trigger); + delay = how_much * how_much * 1000; + } + assert(delay <= 1000); + return delay; +} + // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { @@ -2364,15 +2490,19 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each - // individual write by 1ms to reduce latency variance. Also, + // individual write by 0-1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); - uint64_t t1 = env_->NowMicros(); - env_->SleepForMicroseconds(1000); - uint64_t delayed = env_->NowMicros() - t1; + uint64_t delayed; + { + StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); + env_->SleepForMicroseconds(SlowdownAmount(versions_->NumLevelFiles(0))); + delayed = sw.ElapsedMicros(); + } RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); stall_level0_slowdown_ += delayed; + stall_level0_slowdown_count_++; allow_delay = false; // Do not delay a single write more than once //Log(options_.info_log, // "delaying write %llu usecs for level0_slowdown_writes_trigger\n", @@ -2391,21 +2521,30 @@ Status DBImpl::MakeRoomForWrite(bool force) { // ones are still being compacted, so we wait. DelayLoggingAndReset(); Log(options_.info_log, "wait for memtable compaction...\n"); - uint64_t t1 = env_->NowMicros(); - bg_cv_.Wait(); - const uint64_t stall = env_->NowMicros() -t1; + uint64_t stall; + { + StopWatch sw(env_, options_.statistics, + STALL_MEMTABLE_COMPACTION_COUNT); + bg_cv_.Wait(); + stall = sw.ElapsedMicros(); + } RecordTick(options_.statistics, STALL_MEMTABLE_COMPACTION_MICROS, stall); stall_memtable_compaction_ += stall; + stall_memtable_compaction_count_++; } else if (versions_->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); - uint64_t t1 = env_->NowMicros(); Log(options_.info_log, "wait for fewer level0 files...\n"); - bg_cv_.Wait(); - const uint64_t stall = env_->NowMicros() - t1; + uint64_t stall; + { + StopWatch sw(env_, options_.statistics, STALL_L0_NUM_FILES_COUNT); + bg_cv_.Wait(); + stall = sw.ElapsedMicros(); + } RecordTick(options_.statistics, STALL_L0_NUM_FILES_MICROS, stall); stall_level0_num_files_ += stall; + stall_level0_num_files_count_++; } else if ( allow_rate_limit_delay && options_.rate_limit > 1.0 && @@ -2413,10 +2552,14 @@ Status DBImpl::MakeRoomForWrite(bool force) { // Delay a write when the compaction score for any level is too large. int max_level = versions_->MaxCompactionScoreLevel(); mutex_.Unlock(); - uint64_t t1 = env_->NowMicros(); - env_->SleepForMicroseconds(1000); - uint64_t delayed = env_->NowMicros() - t1; + uint64_t delayed; + { + StopWatch sw(env_, options_.statistics, RATE_LIMIT_DELAY_COUNT); + env_->SleepForMicroseconds(1000); + delayed = sw.ElapsedMicros(); + } stall_leveln_slowdown_[max_level] += delayed; + stall_leveln_slowdown_count_[max_level]++; // Make sure the following value doesn't round to zero. uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); rate_limit_delay_millis += rate_limit; @@ -2454,7 +2597,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { log_.reset(new log::Writer(std::move(lfile))); mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); - mem_ = new MemTable(internal_comparator_, NumberLevels()); + mem_ = new MemTable(internal_comparator_, mem_rep_factory_, + NumberLevels(), options_); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); @@ -2510,6 +2654,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // Add "+1" to make sure seconds_up is > 0 and avoid NaN later double seconds_up = (micros_up + 1) / 1000000.0; uint64_t total_slowdown = 0; + uint64_t total_slowdown_count = 0; uint64_t interval_bytes_written = 0; uint64_t interval_bytes_read = 0; uint64_t interval_bytes_new = 0; @@ -2518,8 +2663,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall\n" - "----------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n" + "--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < NumberLevels(); level++) { @@ -2539,7 +2684,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f\n", + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n", level, files, versions_->NumLevelBytes(level) / 1048576.0, @@ -2561,8 +2706,10 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, stats_[level].count, - stall_leveln_slowdown_[level] / 1000000.0); + stall_leveln_slowdown_[level] / 1000000.0, + (unsigned long) stall_leveln_slowdown_count_[level]); total_slowdown += stall_leveln_slowdown_[level]; + total_slowdown_count += stall_leveln_slowdown_count_[level]; value->append(buf); } } @@ -2638,6 +2785,15 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { total_slowdown / 1000000.0); value->append(buf); + snprintf(buf, sizeof(buf), + "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " + "%lu memtable_compaction, %lu leveln_slowdown\n", + (unsigned long) stall_level0_slowdown_count_, + (unsigned long) stall_level0_num_files_count_, + (unsigned long) stall_memtable_compaction_count_, + (unsigned long) total_slowdown_count); + value->append(buf); + last_stats_.bytes_read_ = total_bytes_read; last_stats_.bytes_written_ = total_bytes_written; last_stats_.bytes_new_ = stats_[0].bytes_written; @@ -2708,8 +2864,7 @@ Status DB::Merge(const WriteOptions& opt, const Slice& key, DB::~DB() { } -Status DB::Open(const Options& options, const std::string& dbname, - DB** dbptr) { +Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = nullptr; EnvOptions soptions; diff --git a/db/db_impl.h b/db/db_impl.h index 5f09035f2..333a86867 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -18,6 +18,7 @@ #include "port/port.h" #include "util/stats_logger.h" #include "memtablelist.h" +#include "leveldb/memtablerep.h" #ifdef USE_SCRIBE #include "scribe/scribe_logger.h" @@ -49,15 +50,21 @@ class DBImpl : public DB { const std::vector& keys, std::vector* values); - // Returns false if key can't exist- based on memtable, immutable-memtable and - // bloom-filters; true otherwise. No IO is performed - virtual bool KeyMayExist(const Slice& key); + // Returns false if key doesn't exist in the database and true if it may. + // If value_found is not passed in as null, then return the value if found in + // memory. On return, if value was found, then value_found will be set to true + // , otherwise false. + virtual bool KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr); virtual Iterator* NewIterator(const ReadOptions&); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false); virtual int NumberLevels(); virtual int MaxMemCompactionLevel(); virtual int Level0StopWriteTrigger(); @@ -159,6 +166,7 @@ class DBImpl : public DB { Status WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber); + uint64_t SlowdownAmount(int num_level0_files); Status MakeRoomForWrite(bool force /* compact even if there is room? */); WriteBatch* BuildBatchGroup(Writer** last_writer); @@ -221,6 +229,14 @@ class DBImpl : public DB { // dump leveldb.stats to LOG void MaybeDumpStats(); + // Return the minimum empty level that could hold the total data in the + // input level. Return the input level, if such level could not be found. + int FindMinimumEmptyLevelFitting(int level); + + // Move the files in the input level to the minimum level that could hold + // the data set. + void ReFitLevel(int level); + // Constant after construction const InternalFilterPolicy internal_filter_policy_; bool owns_info_log_; @@ -235,6 +251,7 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes + std::shared_ptr mem_rep_factory_; MemTable* mem_; MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; @@ -293,6 +310,10 @@ class DBImpl : public DB { uint64_t stall_memtable_compaction_; uint64_t stall_level0_num_files_; std::vector stall_leveln_slowdown_; + uint64_t stall_level0_slowdown_count_; + uint64_t stall_memtable_compaction_count_; + uint64_t stall_level0_num_files_count_; + std::vector stall_leveln_slowdown_count_; // Time at which this instance was started. const uint64_t started_at_; @@ -370,6 +391,12 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions storage_options_; + // A value of true temporarily disables scheduling of background work + bool bg_work_gate_closed_; + + // Guard against multiple concurrent refitting + bool refitting_level_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); @@ -384,11 +411,13 @@ class DBImpl : public DB { std::vector& snapshots, SequenceNumber* prev_snapshot); - // Function that Get and KeyMayExist call with no_IO true or false + // Function that Get and KeyMayExist call with no_io true or false + // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_IO = false); + const bool no_io = false, + bool* value_found = nullptr); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 317d290d0..6199b5e7b 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -47,7 +47,8 @@ public: virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual void CompactRange(const Slice* begin, const Slice* end) { + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false) { } virtual Status DisableFileDeletions() { return Status::NotSupported("Not supported operation in read only mode."); diff --git a/db/db_test.cc b/db/db_test.cc index 02b77f168..8e24cb79f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -291,7 +291,7 @@ class DBTest { // TODO -- test more options break; case kDeletesFilterFirst: - options.deletes_check_filter_first = true; + options.filter_deletes = true; break; default: break; @@ -772,39 +772,84 @@ TEST(DBTest, GetEncountersEmptyLevel) { } while (ChangeOptions()); } -// KeyMayExist-API returns false if memtable(s) and in-memory bloom-filters can -// guarantee that the key doesn't exist in the db, else true. This can lead to -// a few false positives, but not false negatives. To make test deterministic, -// use a much larger number of bits per key-20 than bits in the key, so -// that false positives are eliminated +// KeyMayExist can lead to a few false positives, but not false negatives. +// To make test deterministic, use a much larger number of bits per key-20 than +// bits in the key, so that false positives are eliminated TEST(DBTest, KeyMayExist) { do { + ReadOptions ropts; + std::string value; Options options = CurrentOptions(); options.filter_policy = NewBloomFilterPolicy(20); Reopen(&options); - ASSERT_TRUE(!db_->KeyMayExist("a")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_OK(db_->Put(WriteOptions(), "a", "b")); - ASSERT_TRUE(db_->KeyMayExist("a")); + bool value_found = false; + ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); + ASSERT_TRUE(value_found); + ASSERT_EQ("b", value); dbfull()->Flush(FlushOptions()); - ASSERT_TRUE(db_->KeyMayExist("a")); + value.clear(); + value_found = false; + ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); + ASSERT_TRUE(value_found); + ASSERT_EQ("b", value); ASSERT_OK(db_->Delete(WriteOptions(), "a")); - ASSERT_TRUE(!db_->KeyMayExist("a")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); dbfull()->Flush(FlushOptions()); dbfull()->CompactRange(nullptr, nullptr); - ASSERT_TRUE(!db_->KeyMayExist("a")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_OK(db_->Delete(WriteOptions(), "c")); - ASSERT_TRUE(!db_->KeyMayExist("c")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value)); delete options.filter_policy; } while (ChangeOptions()); } +// A delete is skipped for key if KeyMayExist(key) returns False +// Tests Writebatch consistency and proper delete behaviour +TEST(DBTest, FilterDeletes) { + Options options = CurrentOptions(); + options.filter_policy = NewBloomFilterPolicy(20); + options.filter_deletes = true; + Reopen(&options); + WriteBatch batch; + + batch.Delete("a"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(AllEntriesFor("a"), "[ ]"); // Delete skipped + batch.Clear(); + + batch.Put("a", "b"); + batch.Delete("a"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(Get("a"), "NOT_FOUND"); + ASSERT_EQ(AllEntriesFor("a"), "[ DEL, b ]"); // Delete issued + batch.Clear(); + + batch.Delete("c"); + batch.Put("c", "d"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(Get("c"), "d"); + ASSERT_EQ(AllEntriesFor("c"), "[ d ]"); // Delete skipped + batch.Clear(); + + dbfull()->Flush(FlushOptions()); // A stray Flush + + batch.Delete("c"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(AllEntriesFor("c"), "[ DEL, d ]"); // Delete issued + batch.Clear(); + + delete options.filter_policy; +} + TEST(DBTest, IterEmpty) { Iterator* iter = db_->NewIterator(ReadOptions()); @@ -3007,7 +3052,13 @@ class ModelDB: public DB { Status::NotSupported("Not implemented.")); return s; } - virtual bool KeyMayExist(const Slice& key) { + virtual bool KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr) { + if (value_found != nullptr) { + *value_found = false; + } return true; // Not Supported directly } virtual Iterator* NewIterator(const ReadOptions& options) { @@ -3058,7 +3109,8 @@ class ModelDB: public DB { sizes[i] = 0; } } - virtual void CompactRange(const Slice* start, const Slice* end) { + virtual void CompactRange(const Slice* start, const Slice* end, + bool reduce_level ) { } virtual int NumberLevels() @@ -3191,6 +3243,9 @@ static bool CompareIterators(int step, TEST(DBTest, Randomized) { Random rnd(test::RandomSeed()); do { + if (CurrentOptions().filter_deletes) { + ChangeOptions(); // DBTest.Randomized not suited for filter_deletes + } ModelDB model(CurrentOptions()); const int N = 10000; const Snapshot* model_snap = nullptr; diff --git a/db/memtable.cc b/db/memtable.cc index cfd2bed04..622058804 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -3,6 +3,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/memtable.h" + +#include + #include "db/dbformat.h" #include "leveldb/comparator.h" #include "leveldb/env.h" @@ -19,23 +22,28 @@ static Slice GetLengthPrefixedSlice(const char* data) { return Slice(p, len); } -MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) +MemTable::MemTable(const InternalKeyComparator& cmp, + std::shared_ptr table_factory, + int numlevel, + const Options& options) : comparator_(cmp), refs_(0), - table_(comparator_, &arena_), + arena_impl_(options.arena_block_size), + table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)), flush_in_progress_(false), flush_completed_(false), file_number_(0), edit_(numlevel), first_seqno_(0), - mem_logfile_number_(0) { -} + mem_logfile_number_(0) { } MemTable::~MemTable() { assert(refs_ == 0); } -size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } +size_t MemTable::ApproximateMemoryUsage() { + return arena_impl_.ApproximateMemoryUsage(); +} int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) const { @@ -57,24 +65,27 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - explicit MemTableIterator(MemTable::Table* table) : iter_(table) { } - - virtual bool Valid() const { return iter_.Valid(); } - virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); } - virtual void SeekToFirst() { iter_.SeekToFirst(); } - virtual void SeekToLast() { iter_.SeekToLast(); } - virtual void Next() { iter_.Next(); } - virtual void Prev() { iter_.Prev(); } - virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } + explicit MemTableIterator(MemTableRep* table) + : iter_(table->GetIterator()) { } + + virtual bool Valid() const { return iter_->Valid(); } + virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); } + virtual void SeekToFirst() { iter_->SeekToFirst(); } + virtual void SeekToLast() { iter_->SeekToLast(); } + virtual void Next() { iter_->Next(); } + virtual void Prev() { iter_->Prev(); } + virtual Slice key() const { + return GetLengthPrefixedSlice(iter_->key()); + } virtual Slice value() const { - Slice key_slice = GetLengthPrefixedSlice(iter_.key()); + Slice key_slice = GetLengthPrefixedSlice(iter_->key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: - MemTable::Table::Iterator iter_; + std::shared_ptr iter_; std::string tmp_; // For passing to EncodeKey // No copying allowed @@ -83,7 +94,7 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator() { - return new MemTableIterator(&table_); + return new MemTableIterator(table_.get()); } void MemTable::Add(SequenceNumber s, ValueType type, @@ -100,7 +111,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; - char* buf = arena_.Allocate(encoded_len); + char* buf = arena_impl_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; @@ -109,7 +120,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == (unsigned)encoded_len); - table_.Insert(buf); + table_->Insert(buf); // The first sequence number inserted into the memtable assert(first_seqno_ == 0 || s > first_seqno_); @@ -119,10 +130,10 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only) { + const Options& options) { Slice memkey = key.memtable_key(); - Table::Iterator iter(&table_); - iter.Seek(memkey.data()); + std::shared_ptr iter(table_.get()->GetIterator()); + iter->Seek(memkey.data()); bool merge_in_progress = false; std::string operand; @@ -131,10 +142,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, merge_in_progress = true; } - auto merge_operator = options.merge_operator; auto logger = options.info_log; - for (; iter.Valid(); iter.Next()) { + for (; iter->Valid(); iter->Next()) { // entry format is: // klength varint32 // userkey char[klength-8] @@ -144,7 +154,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. - const char* entry = iter.key(); + const char* entry = iter->key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); if (comparator_.comparator.user_comparator()->Compare( @@ -164,10 +174,6 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return true; } case kTypeMerge: { - if (check_presence_only) { - *s = Status::OK(); - return true; - } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); if (merge_in_progress) { merge_operator->Merge(key.user_key(), &v, operand, diff --git a/db/memtable.h b/db/memtable.h index def3a5d3d..73d32fc4c 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -6,24 +6,34 @@ #define STORAGE_LEVELDB_DB_MEMTABLE_H_ #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" #include "db/version_set.h" -#include "util/arena.h" +#include "leveldb/memtablerep.h" +#include "util/arena_impl.h" namespace leveldb { -class InternalKeyComparator; class Mutex; class MemTableIterator; class MemTable { public: + struct KeyComparator : public MemTableRep::KeyComparator { + const InternalKeyComparator comparator; + explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } + virtual int operator()(const char* a, const char* b) const; + }; + // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. - explicit MemTable(const InternalKeyComparator& comparator, - int numlevel = 7); + explicit MemTable( + const InternalKeyComparator& comparator, + std::shared_ptr table_factory, + int numlevel = 7, + const Options& options = Options()); // Increase reference count. void Ref() { ++refs_; } @@ -63,13 +73,12 @@ class MemTable { // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // If memtable contains Merge operation as the most recent entry for a key, - // and if check_presence_only is set, return true with Status::OK, - // else if the merge process does not stop (not reaching a value or delete), + // and the merge process does not stop (not reaching a value or delete), // store the current merged result in value and MergeInProgress in s. // return false // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only = false); + const Options& options); // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } @@ -88,22 +97,14 @@ class MemTable { private: ~MemTable(); // Private since only Unref() should be used to delete it - - struct KeyComparator { - const InternalKeyComparator comparator; - explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } - int operator()(const char* a, const char* b) const; - }; friend class MemTableIterator; friend class MemTableBackwardIterator; friend class MemTableList; - typedef SkipList Table; - KeyComparator comparator_; int refs_; - Arena arena_; - Table table_; + ArenaImpl arena_impl_; + shared_ptr table_; // These are used to manage memtable flushes to storage bool flush_in_progress_; // started the flush diff --git a/db/memtablelist.cc b/db/memtablelist.cc index ac89d1043..9d8b675bf 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -194,10 +194,10 @@ size_t MemTableList::ApproximateMemoryUsage() { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only) { + const Options& options) { for (list::iterator it = memlist_.begin(); it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s, options, check_presence_only)) { + if ((*it)->Get(key, value, s, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index 40419e56f..b30089cf6 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -8,7 +8,6 @@ #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" -#include "util/arena.h" #include "memtable.h" namespace leveldb { @@ -71,7 +70,7 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only = false); + const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/merge_test.cc b/db/merge_test.cc index 2d2f6514f..903a824a5 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -8,19 +8,29 @@ #include "leveldb/env.h" #include "leveldb/merge_operator.h" #include "db/dbformat.h" +#include "db/db_impl.h" #include "utilities/merge_operators.h" +#include "util/testharness.h" +#include "utilities/utility_db.h" using namespace std; using namespace leveldb; auto mergeOperator = MergeOperators::CreateUInt64AddOperator(); -std::shared_ptr OpenDb() { +std::shared_ptr OpenDb(const string& dbname, const bool ttl = false) { DB* db; Options options; options.create_if_missing = true; options.merge_operator = mergeOperator.get(); - Status s = DB::Open(options, "/tmp/testdb", &db); + Status s; + DestroyDB(dbname, Options()); + if (ttl) { + cout << "Opening database with TTL\n"; + s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl", &db); + } else { + s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); + } if (!s.ok()) { cerr << s.ToString() << endl; assert(false); @@ -45,7 +55,7 @@ class Counters { uint64_t default_; public: - Counters(std::shared_ptr db, uint64_t defaultCount = 0) + explicit Counters(std::shared_ptr db, uint64_t defaultCount = 0) : db_(db), put_option_(), get_option_(), @@ -143,7 +153,7 @@ class MergeBasedCounters : public Counters { WriteOptions merge_option_; // for merge public: - MergeBasedCounters(std::shared_ptr db, uint64_t defaultCount = 0) + explicit MergeBasedCounters(std::shared_ptr db, uint64_t defaultCount = 0) : Counters(db, defaultCount), merge_option_() { } @@ -227,9 +237,8 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } -int main(int argc, char *argv[]) { - - auto db = OpenDb(); +void runTest(int argc, const string& dbname, const bool use_ttl = false) { + auto db = OpenDb(dbname, use_ttl); { cout << "Test read-modify-write counters... \n"; @@ -249,5 +258,12 @@ int main(int argc, char *argv[]) { testCounters(counters, db.get(), compact); } + DestroyDB(dbname, Options()); +} + +int main(int argc, char *argv[]) { + //TODO: Make this test like a general rocksdb unit-test + runTest(argc, "/tmp/testdb"); + runTest(argc, "/tmp/testdbttl", true); // Run test on TTL database return 0; } diff --git a/db/repair.cc b/db/repair.cc index 09406781a..3737b09c2 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -192,7 +192,8 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_.num_levels); + MemTable* mem = new MemTable(icmp_, options_.memtable_factory, + options_.num_levels); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { diff --git a/db/skiplist.h b/db/skiplist.h index 1c7b4dd71..a3fe05dbb 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -31,13 +31,10 @@ #include #include #include "port/port.h" -#include "util/arena.h" #include "util/random.h" namespace leveldb { -class Arena; - template class SkipList { private: diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc index fa8a21a31..3542183f1 100644 --- a/db/skiplist_test.cc +++ b/db/skiplist_test.cc @@ -5,7 +5,7 @@ #include "db/skiplist.h" #include #include "leveldb/env.h" -#include "util/arena.h" +#include "util/arena_impl.h" #include "util/hash.h" #include "util/random.h" #include "util/testharness.h" @@ -29,9 +29,9 @@ struct TestComparator { class SkipTest { }; TEST(SkipTest, Empty) { - Arena arena; + ArenaImpl arena_impl; TestComparator cmp; - SkipList list(cmp, &arena); + SkipList list(cmp, &arena_impl); ASSERT_TRUE(!list.Contains(10)); SkipList::Iterator iter(&list); @@ -49,9 +49,9 @@ TEST(SkipTest, InsertAndLookup) { const int R = 5000; Random rnd(1000); std::set keys; - Arena arena; + ArenaImpl arena_impl; TestComparator cmp; - SkipList list(cmp, &arena); + SkipList list(cmp, &arena_impl); for (int i = 0; i < N; i++) { Key key = rnd.Next() % R; if (keys.insert(key).second) { @@ -204,14 +204,14 @@ class ConcurrentTest { // Current state of the test State current_; - Arena arena_; + ArenaImpl arena_impl_; // SkipList is not protected by mu_. We just use a single writer // thread to modify it. SkipList list_; public: - ConcurrentTest() : list_(TestComparator(), &arena_) { } + ConcurrentTest() : list_(TestComparator(), &arena_impl_) { } // REQUIRES: External synchronization void WriteStep(Random* rnd) { diff --git a/db/skiplistrep.h b/db/skiplistrep.h new file mode 100644 index 000000000..d22768f4d --- /dev/null +++ b/db/skiplistrep.h @@ -0,0 +1,102 @@ +#ifndef STORAGE_LEVELDB_DB_SKIPLISTREP_H_ +#define STORAGE_LEVELDB_DB_SKIPLISTREP_H_ + +#include "leveldb/memtablerep.h" +#include "db/memtable.h" +#include "db/skiplist.h" + +namespace leveldb { + +class Arena; + +class SkipListRep : public MemTableRep { + SkipList skip_list_; +public: + explicit SkipListRep(MemTableRep::KeyComparator& compare, Arena* arena) + : skip_list_(compare, arena) { +} + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + virtual void Insert(const char* key) { + skip_list_.Insert(key); + } + + // Returns true iff an entry that compares equal to key is in the list. + virtual bool Contains(const char* key) const { + return skip_list_.Contains(key); + } + + virtual ~SkipListRep() { } + + // Iteration over the contents of a skip list + class Iterator : public MemTableRep::Iterator { + SkipList::Iterator iter_; + public: + // Initialize an iterator over the specified list. + // The returned iterator is not valid. + explicit Iterator( + const SkipList* list + ) : iter_(list) { } + + virtual ~Iterator() { } + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const { + return iter_.Valid(); + } + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const { + return iter_.key(); + } + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() { + iter_.Next(); + } + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() { + iter_.Prev(); + } + + // Advance to the first entry with a key >= target + virtual void Seek(const char* target) { + iter_.Seek(target); + } + + // Position at the first entry in list. + // Final state of iterator is Valid() iff list is not empty. + virtual void SeekToFirst() { + iter_.SeekToFirst(); + } + + // Position at the last entry in list. + // Final state of iterator is Valid() iff list is not empty. + virtual void SeekToLast() { + iter_.SeekToLast(); + } + }; + + virtual std::shared_ptr GetIterator() { + return std::shared_ptr( + new SkipListRep::Iterator(&skip_list_) + ); + } +}; + +class SkipListFactory : public MemTableRepFactory { +public: + virtual std::shared_ptr CreateMemTableRep ( + MemTableRep::KeyComparator& compare, Arena* arena) { + return std::shared_ptr(new SkipListRep(compare, arena)); + } +}; + +} + +#endif // STORAGE_LEVELDB_DB_SKIPLISTREP_H_ diff --git a/db/table_cache.cc b/db/table_cache.cc index 4cc105afe..02408d95c 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -39,15 +39,19 @@ TableCache::~TableCache() { Status TableCache::FindTable(const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, - Cache::Handle** handle, bool* tableIO) { + Cache::Handle** handle, bool* table_io, + const bool no_io) { Status s; char buf[sizeof(file_number)]; EncodeFixed64(buf, file_number); Slice key(buf, sizeof(buf)); *handle = cache_->Lookup(key); if (*handle == nullptr) { - if (tableIO != nullptr) { - *tableIO = true; // we had to do IO from storage + if (no_io) { // Dont do IO and return a not-found status + return Status::NotFound("Table not found in table_cache, no_io is set"); + } + if (table_io != nullptr) { + *table_io = true; // we had to do IO from storage } std::string fname = TableFileName(dbname_, file_number); unique_ptr file; @@ -112,17 +116,21 @@ Status TableCache::Get(const ReadOptions& options, const Slice& k, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), - bool* tableIO, + bool* table_io, void (*mark_key_may_exist)(void*), - const bool no_IO) { + const bool no_io) { Cache::Handle* handle = nullptr; Status s = FindTable(storage_options_, file_number, file_size, - &handle, tableIO); + &handle, table_io, no_io); if (s.ok()) { Table* t = reinterpret_cast(cache_->Value(handle)); - s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_IO); + s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_io); cache_->Release(handle); + } else if (no_io && s.IsNotFound()) { + // Couldnt find Table in cache but treat as kFound if no_io set + (*mark_key_may_exist)(arg); + return Status::OK(); } return s; } diff --git a/db/table_cache.h b/db/table_cache.h index 737e53c9e..2f3787609 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -48,9 +48,9 @@ class TableCache { const Slice& k, void* arg, bool (*handle_result)(void*, const Slice&, const Slice&, bool), - bool* tableIO, + bool* table_io, void (*mark_key_may_exist)(void*) = nullptr, - const bool no_IO = false); + const bool no_io = false); // Evict any entry for the specified file number void Evict(uint64_t file_number); @@ -62,9 +62,9 @@ class TableCache { const EnvOptions& storage_options_; std::shared_ptr cache_; - Status FindTable(const EnvOptions& toptions, - uint64_t file_number, uint64_t file_size, Cache::Handle**, - bool* tableIO = nullptr); + Status FindTable(const EnvOptions& toptions, uint64_t file_number, + uint64_t file_size, Cache::Handle**, bool* table_io=nullptr, + const bool no_io = false); }; } // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 79965c962..0d04d2f52 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -239,6 +239,7 @@ struct Saver { SaverState state; const Comparator* ucmp; Slice user_key; + bool* value_found; // Is value set correctly? Used by KeyMayExist std::string* value; const MergeOperator* merge_operator; Logger* logger; @@ -246,13 +247,17 @@ struct Saver { }; } -// Called from TableCache::Get when bloom-filters can't guarantee that key does -// not exist and Get is not permitted to do IO to read the data-block and be -// certain. -// Set the key as Found and let the caller know that key-may-exist +// Called from TableCache::Get and InternalGet when file/block in which key may +// exist are not there in TableCache/BlockCache respectively. In this case we +// can't guarantee that key does not exist and are not permitted to do IO to be +// certain.Set the status=kFound and value_found=false to let the caller know +// that key may exist but is not there in memory static void MarkKeyMayExist(void* arg) { Saver* s = reinterpret_cast(arg); s->state = kFound; + if (s->value_found != nullptr) { + *(s->value_found) = false; + } } static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ @@ -348,7 +353,8 @@ void Version::Get(const ReadOptions& options, Status *status, GetStats* stats, const Options& db_options, - const bool no_IO) { + const bool no_io, + bool* value_found) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); @@ -357,13 +363,14 @@ void Version::Get(const ReadOptions& options, auto logger = db_options.info_log; assert(status->ok() || status->IsMergeInProgress()); - if (no_IO) { + if (no_io) { assert(status->ok()); } Saver saver; saver.state = status->ok()? kNotFound : kMerge; saver.ucmp = ucmp; saver.user_key = user_key; + saver.value_found = value_found; saver.value = value; saver.merge_operator = merge_operator; saver.logger = logger.get(); @@ -432,7 +439,7 @@ void Version::Get(const ReadOptions& options, bool tableIO = false; *status = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue, &tableIO, - MarkKeyMayExist, no_IO); + MarkKeyMayExist, no_io); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; diff --git a/db/version_set.h b/db/version_set.h index 342bda711..ca8a24afc 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -75,7 +75,7 @@ class Version { }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, GetStats* stats, const Options& db_option, - const bool no_IO = false); + const bool no_io = false, bool* value_found = nullptr); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. @@ -136,6 +136,7 @@ class Version { private: friend class Compaction; friend class VersionSet; + friend class DBImpl; class LevelFileNumIterator; Iterator* NewConcatenatingIterator(const ReadOptions&, diff --git a/db/write_batch.cc b/db/write_batch.cc index 4ca4819fd..a4213cd97 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -16,9 +16,12 @@ #include "leveldb/write_batch.h" -#include "leveldb/db.h" +#include "leveldb/options.h" +#include "leveldb/statistics.h" #include "db/dbformat.h" +#include "db/db_impl.h" #include "db/memtable.h" +#include "db/snapshot.h" #include "db/write_batch_internal.h" #include "util/coding.h" #include @@ -139,6 +142,23 @@ class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; + const Options* options_; + DBImpl* db_; + const bool filter_deletes_; + + MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts, + DB* db, const bool filter_deletes) + : sequence_(sequence), + mem_(mem), + options_(opts), + db_(reinterpret_cast(db)), + filter_deletes_(filter_deletes) { + assert(mem_); + if (filter_deletes_) { + assert(options_); + assert(db_); + } + } virtual void Put(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeValue, key, value); @@ -149,17 +169,28 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } virtual void Delete(const Slice& key) { + if (filter_deletes_) { + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + std::string value; + if (!db_->KeyMayExist(ropts, key, &value)) { + RecordTick(options_->statistics, NUMBER_FILTERED_DELETES); + return; + } + } mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; } // namespace -Status WriteBatchInternal::InsertInto(const WriteBatch* b, - MemTable* memtable) { - MemTableInserter inserter; - inserter.sequence_ = WriteBatchInternal::Sequence(b); - inserter.mem_ = memtable; +Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem, + const Options* opts, DB* db, + const bool filter_deletes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db, + filter_deletes); return b->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index eb37733c2..649752ce6 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -7,6 +7,8 @@ #include "leveldb/types.h" #include "leveldb/write_batch.h" +#include "leveldb/db.h" +#include "leveldb/options.h" namespace leveldb { @@ -39,7 +41,12 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); - static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + // Inserts batch entries into memtable + // Drops deletes in batch if filter_del is set to true and + // db->KeyMayExist returns false + static Status InsertInto(const WriteBatch* batch, MemTable* memtable, + const Options* opts = nullptr, DB* db = nullptr, + const bool filter_del = false); static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index d17a08e8e..945ef16bd 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -4,6 +4,8 @@ #include "leveldb/db.h" +#include +#include "db/skiplistrep.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/env.h" @@ -14,7 +16,8 @@ namespace leveldb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* mem = new MemTable(cmp); + auto factory = std::make_shared(); + MemTable* mem = new MemTable(cmp, factory); mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem); diff --git a/include/leveldb/arena.h b/include/leveldb/arena.h new file mode 100644 index 000000000..6e3a1f00b --- /dev/null +++ b/include/leveldb/arena.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Arena class defines memory allocation methods. It's used by memtable and +// skiplist. + +#ifndef STORAGE_LEVELDB_INCLUDE_ARENA_H_ +#define STORAGE_LEVELDB_INCLUDE_ARENA_H_ + +namespace leveldb { + +class Arena { + public: + Arena() {}; + virtual ~Arena() {}; + + // Return a pointer to a newly allocated memory block of "bytes" bytes. + virtual char* Allocate(size_t bytes) = 0; + + // Allocate memory with the normal alignment guarantees provided by malloc. + virtual char* AllocateAligned(size_t bytes) = 0; + + // Returns an estimate of the total memory used by arena. + virtual const size_t ApproximateMemoryUsage() = 0; + + // Returns the total number of bytes in all blocks allocated so far. + virtual const size_t MemoryAllocatedBytes() = 0; + + private: + // No copying allowed + Arena(const Arena&); + void operator=(const Arena&); +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_ARENA_H_ diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 056920d9e..0c056c362 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -104,7 +104,8 @@ class DB { // // May return some other Status on an error. virtual Status Get(const ReadOptions& options, - const Slice& key, std::string* value) = 0; + const Slice& key, + std::string* value) = 0; // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and @@ -121,9 +122,21 @@ class DB { std::vector* values) = 0; // If the key definitely does not exist in the database, then this method - // returns false. Otherwise return true. This check is potentially - // lighter-weight than invoking DB::Get(). No IO is performed - virtual bool KeyMayExist(const Slice& key) = 0; + // returns false, else true. If the caller wants to obtain value when the key + // is found in memory, a bool for 'value_found' must be passed. 'value_found' + // will be true on return if value has been set properly. + // This check is potentially lighter-weight than invoking DB::Get(). One way + // to make this lighter weight is to avoid doing any IOs. + // Default implementation here returns true and sets 'value_found' to false + virtual bool KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr) { + if (value_found != nullptr) { + *value_found = false; + } + return true; + } // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must @@ -180,7 +193,14 @@ class DB { // end==nullptr is treated as a key after all keys in the database. // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); - virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + // Note that after the entire database is compacted, all data are pushed + // down to the last level containing any data. If the total data size + // after compaction is reduced, that level might not be appropriate for + // hosting all the files. In this case, client could set reduce_level + // to true, to move the files back to the minimum level capable of holding + // the data set. + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false) = 0; // Number of levels used for this DB. virtual int NumberLevels() = 0; diff --git a/include/leveldb/memtablerep.h b/include/leveldb/memtablerep.h new file mode 100644 index 000000000..bf769f543 --- /dev/null +++ b/include/leveldb/memtablerep.h @@ -0,0 +1,88 @@ +// This file contains the interface that must be implemented by any collection +// to be used as the backing store for a MemTable. Such a collection must +// satisfy the following properties: +// (1) It does not store duplicate items. +// (2) It uses MemTableRep::KeyComparator to compare items for iteration and +// equality. +// (3) It can be accessed concurrently by multiple readers but need not support +// concurrent writes. +// (4) Items are never deleted. +// The liberal use of assertions is encouraged to enforce (1). + +#ifndef STORAGE_LEVELDB_DB_TABLE_H_ +#define STORAGE_LEVELDB_DB_TABLE_H_ + +#include +#include "leveldb/arena.h" + +namespace leveldb { + +class MemTableRep { + public: + // KeyComparator(a, b) returns a negative value if a is less than b, 0 if they + // are equal, and a positive value if b is greater than a + class KeyComparator { + public: + virtual int operator()(const char* a, const char* b) const = 0; + virtual ~KeyComparator() { } + }; + + // Insert key into the collection. (The caller will pack key and value into a + // single buffer and pass that in as the parameter to Insert) + // REQUIRES: nothing that compares equal to key is currently in the + // collection. + virtual void Insert(const char* key) = 0; + + // Returns true iff an entry that compares equal to key is in the collection. + virtual bool Contains(const char* key) const = 0; + + virtual ~MemTableRep() { } + + // Iteration over the contents of a skip collection + class Iterator { + public: + // Initialize an iterator over the specified collection. + // The returned iterator is not valid. + // explicit Iterator(const MemTableRep* collection); + virtual ~Iterator() { }; + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const = 0; + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const = 0; + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() = 0; + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() = 0; + + // Advance to the first entry with a key >= target + virtual void Seek(const char* target) = 0; + + // Position at the first entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToFirst() = 0; + + // Position at the last entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToLast() = 0; + }; + + virtual std::shared_ptr GetIterator() = 0; +}; + +class MemTableRepFactory { + public: + virtual ~MemTableRepFactory() { }; + virtual std::shared_ptr CreateMemTableRep( + MemTableRep::KeyComparator&, Arena* arena) = 0; +}; + +} + +#endif // STORAGE_LEVELDB_DB_TABLE_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index a94a17a22..f12734ee1 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -13,6 +13,7 @@ #include "leveldb/slice.h" #include "leveldb/statistics.h" #include "leveldb/universal_compaction.h" +#include "leveldb/memtablerep.h" namespace leveldb { @@ -224,9 +225,9 @@ struct Options { // level-0 compaction will not be triggered by number of files at all. int level0_file_num_compaction_trigger; - // Soft limit on number of level-0 files. We slow down writes at this point. - // A value <0 means that no writing slow down will be triggered by number - // of files in level-0. + // Soft limit on number of level-0 files. We start slowing down writes at this + // point. A value <0 means that no writing slow down will be triggered by + // number of files in level-0. int level0_slowdown_writes_trigger; // Maximum number of level-0 files. We stop writes at this point. @@ -380,6 +381,13 @@ struct Options { // Number of shards used for table cache. int table_cache_numshardbits; + // size of one block in arena memory allocation. + // If <= 0, a proper value is automatically calculated (usually 1/10 of + // writer_buffer_size). + // + // Default: 0 + size_t arena_block_size; + // Create an Options object with default values for all fields. Options(); @@ -477,14 +485,17 @@ struct Options { // The options needed to support Universal Style compactions CompactionOptionsUniversal compaction_options_universal; - // Use bloom-filter for deletes when this is true. - // db->Delete first calls KeyMayExist which checks memtable,immutable-memtable - // and bloom-filters to determine if the key does not exist in the database. - // If the key definitely does not exist, then the delete is a noop.KeyMayExist - // only incurs in-memory look up. This optimization avoids writing the delete - // to storage when appropriate. + // Use KeyMayExist API to filter deletes when this is true. + // If KeyMayExist returns false, i.e. the key definitely does not exist, then + // the delete is a noop. KeyMayExist only incurs in-memory look up. + // This optimization avoids writing the delete to storage when appropriate. // Default: false - bool deletes_check_filter_first; + bool filter_deletes; + + // This is a factory that provides MemTableRep objects. + // Default: a factory that provides a skip-list-based implementation of + // MemTableRep. + std::shared_ptr memtable_factory; }; // Options that control read operations diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 928ae5a14..bd21f7e32 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -80,7 +80,7 @@ const std::vector> TickersNameMap = { { STALL_L0_SLOWDOWN_MICROS, "rocksdb.l0.slowdown.micros" }, { STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros" }, { STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros" }, - { RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.dleay.millis" }, + { RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis" }, { NO_ITERATORS, "rocksdb.num.iterators" }, { NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get" }, { NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read" }, @@ -109,8 +109,13 @@ enum Histograms { READ_BLOCK_COMPACTION_MICROS = 9, READ_BLOCK_GET_MICROS = 10, WRITE_RAW_BLOCK_MICROS = 11, - NUM_FILES_IN_SINGLE_COMPACTION = 12, - HISTOGRAM_ENUM_MAX = 13 + + STALL_L0_SLOWDOWN_COUNT = 12, + STALL_MEMTABLE_COMPACTION_COUNT = 13, + STALL_L0_NUM_FILES_COUNT = 14, + RATE_LIMIT_DELAY_COUNT = 15, + NUM_FILES_IN_SINGLE_COMPACTION = 16, + HISTOGRAM_ENUM_MAX = 17 }; const std::vector> HistogramsNameMap = { @@ -126,6 +131,10 @@ const std::vector> HistogramsNameMap = { { READ_BLOCK_COMPACTION_MICROS, "rocksdb.read.block.compaction.micros" }, { READ_BLOCK_GET_MICROS, "rocksdb.read.block.get.micros" }, { WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros" }, + { STALL_L0_SLOWDOWN_COUNT, "rocksdb.l0.slowdown.count"}, + { STALL_MEMTABLE_COMPACTION_COUNT, "rocksdb.memtable.compaction.count"}, + { STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"}, + { RATE_LIMIT_DELAY_COUNT, "rocksdb.rate.limit.delay.count"}, { NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" } }; diff --git a/table/table.cc b/table/table.cc index f7b664a4f..80c5ef491 100644 --- a/table/table.cc +++ b/table/table.cc @@ -235,7 +235,8 @@ Iterator* Table::BlockReader(void* arg, const ReadOptions& options, const Slice& index_value, bool* didIO, - bool for_compaction) { + bool for_compaction, + const bool no_io) { Table* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); std::shared_ptr statistics = table->rep_->options.statistics; @@ -264,6 +265,8 @@ Iterator* Table::BlockReader(void* arg, block = reinterpret_cast(block_cache->Value(cache_handle)); RecordTick(statistics, BLOCK_CACHE_HIT); + } else if (no_io) { + return nullptr; // Did not find in block_cache and can't do IO } else { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; @@ -286,7 +289,9 @@ Iterator* Table::BlockReader(void* arg, RecordTick(statistics, BLOCK_CACHE_MISS); } - } else { + } else if (no_io) { + return nullptr; // Could not read from block_cache and can't do IO + }else { s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO); } } @@ -324,7 +329,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, bool (*saver)(void*, const Slice&, const Slice&, bool), void (*mark_key_may_exist)(void*), - const bool no_IO) { + const bool no_io) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); bool done = false; @@ -340,16 +345,17 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, // cross one data block, we should be fine. RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL); break; - } else if (no_IO) { - // Update Saver.state to Found because we are only looking for whether - // bloom-filter can guarantee the key is not there when "no_IO" - (*mark_key_may_exist)(arg); - done = true; } else { bool didIO = false; Iterator* block_iter = BlockReader(this, options, iiter->value(), - &didIO); + &didIO, no_io); + if (no_io && !block_iter) { // couldn't get block from block_cache + // Update Saver.state to Found because we are only looking for whether + // we can guarantee the key is not there when "no_io" is set + (*mark_key_may_exist)(arg); + break; + } for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { done = true; diff --git a/table/table.h b/table/table.h index 4674e262b..b39a5c186 100644 --- a/table/table.h +++ b/table/table.h @@ -77,7 +77,8 @@ class Table { const EnvOptions& soptions, const Slice&, bool for_compaction); static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, - bool* didIO, bool for_compaction = false); + bool* didIO, bool for_compaction = false, + const bool no_io = false); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. @@ -88,7 +89,7 @@ class Table { void* arg, bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), void (*mark_key_may_exist)(void*) = nullptr, - const bool no_IO = false); + const bool no_io = false); void ReadMeta(const Footer& footer); diff --git a/table/table_test.cc b/table/table_test.cc index a2bba940d..118ffa232 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3,8 +3,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include +#include #include "db/dbformat.h" #include "db/memtable.h" +#include "db/skiplistrep.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -342,8 +344,9 @@ class MemTableConstructor: public Constructor { public: explicit MemTableConstructor(const Comparator* cmp) : Constructor(cmp), - internal_comparator_(cmp) { - memtable_ = new MemTable(internal_comparator_); + internal_comparator_(cmp), + table_factory_(new SkipListFactory) { + memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); } ~MemTableConstructor() { @@ -351,7 +354,7 @@ class MemTableConstructor: public Constructor { } virtual Status FinishImpl(const Options& options, const KVMap& data) { memtable_->Unref(); - memtable_ = new MemTable(internal_comparator_); + memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -369,6 +372,7 @@ class MemTableConstructor: public Constructor { private: InternalKeyComparator internal_comparator_; MemTable* memtable_; + std::shared_ptr table_factory_; }; class DBConstructor: public Constructor { @@ -805,7 +809,8 @@ class MemTableTest { }; TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* memtable = new MemTable(cmp); + auto table_factory = std::make_shared(); + MemTable* memtable = new MemTable(cmp, table_factory); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 0f5803713..187f45995 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -79,7 +79,8 @@ def main(argv): ' --target_file_size_multiplier=2 ' + \ ' --max_write_buffer_number=3 ' + \ ' --max_background_compactions=20 ' + \ - ' --max_bytes_for_level_base=10485760' + ' --max_bytes_for_level_base=10485760 ' + \ + ' --filter_deletes=' + str(random.randint(0, 1)) killtime = time.time() + interval child = subprocess.Popen(['./db_stress \ --test_batches_snapshots=1 \ diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index de599f1b5..2398efd91 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -84,7 +84,8 @@ def main(argv): ' --target_file_size_multiplier=2 ' + \ ' --max_write_buffer_number=3 ' + \ ' --max_background_compactions=20 ' + \ - ' --max_bytes_for_level_base=10485760' + ' --max_bytes_for_level_base=10485760 ' + \ + ' --filter_deletes=' + str(random.randint(0, 1)) print ("Running db_stress with additional options=\n" + additional_opts + "\n") diff --git a/tools/db_stress.cc b/tools/db_stress.cc index fee424359..b9cd20a02 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -183,8 +183,8 @@ static uint32_t FLAGS_log2_keys_per_lock = 2; // implies 2^2 keys per lock // Percentage of times we want to purge redundant keys in memory before flushing static uint32_t FLAGS_purge_redundant_percent = 50; -// On true, deletes use bloom-filter and drop the delete if key not present -static bool FLAGS_deletes_check_filter_first = false; +// On true, deletes use KeyMayExist to drop the delete if key not present +static bool FLAGS_filter_deletes = false; // Level0 compaction start trigger static int FLAGS_level0_file_num_compaction_trigger = 0; @@ -907,7 +907,7 @@ class StressTest { fprintf(stdout, "Purge redundant %% : %d\n", FLAGS_purge_redundant_percent); fprintf(stdout, "Deletes use filter : %d\n", - FLAGS_deletes_check_filter_first); + FLAGS_filter_deletes); fprintf(stdout, "Num keys per lock : %d\n", 1 << FLAGS_log2_keys_per_lock); @@ -964,7 +964,7 @@ class StressTest { options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; options.max_manifest_file_size = 1024; - options.deletes_check_filter_first = FLAGS_deletes_check_filter_first; + options.filter_deletes = FLAGS_filter_deletes; static Random purge_percent(1000); // no benefit from non-determinism here if (purge_percent.Uniform(100) < FLAGS_purge_redundant_percent - 1) { options.purge_redundant_kvs_while_flush = false; @@ -1168,9 +1168,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--purge_redundant_percent=%d%c", &n, &junk) == 1 && (n >= 0 && n <= 100)) { FLAGS_purge_redundant_percent = n; - } else if (sscanf(argv[i], "--deletes_check_filter_first=%d%c", &n, &junk) + } else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { - FLAGS_deletes_check_filter_first = n; + FLAGS_filter_deletes = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/util/arena.cc b/util/arena_impl.cc similarity index 74% rename from util/arena.cc rename to util/arena_impl.cc index a339f4055..6d39a80d7 100644 --- a/util/arena.cc +++ b/util/arena_impl.cc @@ -2,27 +2,32 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/arena.h" -#include +#include "util/arena_impl.h" namespace leveldb { -static const int kBlockSize = 4096; +ArenaImpl::ArenaImpl(size_t block_size) { + if (block_size < kMinBlockSize) { + block_size_ = kMinBlockSize; + } else if (block_size > kMaxBlockSize) { + block_size_ = kMaxBlockSize; + } else { + block_size_ = block_size; + } -Arena::Arena() { blocks_memory_ = 0; alloc_ptr_ = nullptr; // First allocation will allocate a block alloc_bytes_remaining_ = 0; } -Arena::~Arena() { +ArenaImpl::~ArenaImpl() { for (size_t i = 0; i < blocks_.size(); i++) { delete[] blocks_[i]; } } -char* Arena::AllocateFallback(size_t bytes) { - if (bytes > kBlockSize / 4) { +char* ArenaImpl::AllocateFallback(size_t bytes) { + if (bytes > block_size_ / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. char* result = AllocateNewBlock(bytes); @@ -30,8 +35,8 @@ char* Arena::AllocateFallback(size_t bytes) { } // We waste the remaining space in the current block. - alloc_ptr_ = AllocateNewBlock(kBlockSize); - alloc_bytes_remaining_ = kBlockSize; + alloc_ptr_ = AllocateNewBlock(block_size_); + alloc_bytes_remaining_ = block_size_; char* result = alloc_ptr_; alloc_ptr_ += bytes; @@ -39,7 +44,7 @@ char* Arena::AllocateFallback(size_t bytes) { return result; } -char* Arena::AllocateAligned(size_t bytes) { +char* ArenaImpl::AllocateAligned(size_t bytes) { const int align = sizeof(void*); // We'll align to pointer size assert((align & (align-1)) == 0); // Pointer size should be a power of 2 size_t current_mod = reinterpret_cast(alloc_ptr_) & (align-1); @@ -58,7 +63,7 @@ char* Arena::AllocateAligned(size_t bytes) { return result; } -char* Arena::AllocateNewBlock(size_t block_bytes) { +char* ArenaImpl::AllocateNewBlock(size_t block_bytes) { char* result = new char[block_bytes]; blocks_memory_ += block_bytes; blocks_.push_back(result); diff --git a/util/arena.h b/util/arena_impl.h similarity index 55% rename from util/arena.h rename to util/arena_impl.h index 8f7dde226..a5425e87a 100644 --- a/util/arena.h +++ b/util/arena_impl.h @@ -2,38 +2,53 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_ -#define STORAGE_LEVELDB_UTIL_ARENA_H_ +// ArenaImpl is an implementation of Arena class. For a request of small size, +// it allocates a block with pre-defined block size. For a request of big +// size, it uses malloc to directly get the requested size. + +#ifndef STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_ +#define STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_ #include #include #include #include +#include "leveldb/arena.h" namespace leveldb { -class Arena { +class ArenaImpl : public Arena { public: - Arena(); - ~Arena(); + explicit ArenaImpl(size_t block_size = kMinBlockSize); + virtual ~ArenaImpl(); - // Return a pointer to a newly allocated memory block of "bytes" bytes. - char* Allocate(size_t bytes); + virtual char* Allocate(size_t bytes); - // Allocate memory with the normal alignment guarantees provided by malloc - char* AllocateAligned(size_t bytes); + virtual char* AllocateAligned(size_t bytes); // Returns an estimate of the total memory usage of data allocated // by the arena (including space allocated but not yet used for user // allocations). - size_t MemoryUsage() const { + // + // TODO: Do we need to exclude space allocated but not used? + virtual const size_t ApproximateMemoryUsage() { return blocks_memory_ + blocks_.capacity() * sizeof(char*); } + virtual const size_t MemoryAllocatedBytes() { + return blocks_memory_; + } + private: char* AllocateFallback(size_t bytes); char* AllocateNewBlock(size_t block_bytes); + static const size_t kMinBlockSize = 4096; + static const size_t kMaxBlockSize = 2 << 30; + + // Number of bytes allocated in one block + size_t block_size_; + // Allocation state char* alloc_ptr_; size_t alloc_bytes_remaining_; @@ -45,11 +60,11 @@ class Arena { size_t blocks_memory_; // No copying allowed - Arena(const Arena&); - void operator=(const Arena&); + ArenaImpl(const ArenaImpl&); + void operator=(const ArenaImpl&); }; -inline char* Arena::Allocate(size_t bytes) { +inline char* ArenaImpl::Allocate(size_t bytes) { // The semantics of what to return are a bit messy if we allow // 0-byte allocations, so we disallow them here (we don't need // them for our internal use). @@ -65,4 +80,4 @@ inline char* Arena::Allocate(size_t bytes) { } // namespace leveldb -#endif // STORAGE_LEVELDB_UTIL_ARENA_H_ +#endif // STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_ diff --git a/util/arena_test.cc b/util/arena_test.cc index d5c33d75b..13c6e9391 100644 --- a/util/arena_test.cc +++ b/util/arena_test.cc @@ -2,22 +2,59 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/arena.h" - +#include "util/arena_impl.h" #include "util/random.h" #include "util/testharness.h" namespace leveldb { -class ArenaTest { }; +class ArenaImplTest { }; + +TEST(ArenaImplTest, Empty) { + ArenaImpl arena0; +} + +TEST(ArenaImplTest, MemoryAllocatedBytes) { + const int N = 17; + size_t req_sz; //requested size + size_t bsz = 8192; // block size + size_t expected_memory_allocated; -TEST(ArenaTest, Empty) { - Arena arena; + ArenaImpl arena_impl(bsz); + + // requested size > quarter of a block: + // allocate requested size separately + req_sz = 3001; + for (int i = 0; i < N; i++) { + arena_impl.Allocate(req_sz); + } + expected_memory_allocated = req_sz * N; + ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); + + // requested size < quarter of a block: + // allocate a block with the default size, then try to use unused part + // of the block. So one new block will be allocated for the first + // Allocate(99) call. All the remaining calls won't lead to new allocation. + req_sz = 99; + for (int i = 0; i < N; i++) { + arena_impl.Allocate(req_sz); + } + expected_memory_allocated += bsz; + ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); + + // requested size > quarter of a block: + // allocate requested size separately + req_sz = 99999999; + for (int i = 0; i < N; i++) { + arena_impl.Allocate(req_sz); + } + expected_memory_allocated += req_sz * N; + ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); } -TEST(ArenaTest, Simple) { +TEST(ArenaImplTest, Simple) { std::vector > allocated; - Arena arena; + ArenaImpl arena_impl; const int N = 100000; size_t bytes = 0; Random rnd(301); @@ -35,9 +72,9 @@ TEST(ArenaTest, Simple) { } char* r; if (rnd.OneIn(10)) { - r = arena.AllocateAligned(s); + r = arena_impl.AllocateAligned(s); } else { - r = arena.Allocate(s); + r = arena_impl.Allocate(s); } for (unsigned int b = 0; b < s; b++) { @@ -46,9 +83,9 @@ TEST(ArenaTest, Simple) { } bytes += s; allocated.push_back(std::make_pair(s, r)); - ASSERT_GE(arena.MemoryUsage(), bytes); + ASSERT_GE(arena_impl.ApproximateMemoryUsage(), bytes); if (i > N/10) { - ASSERT_LE(arena.MemoryUsage(), bytes * 1.10); + ASSERT_LE(arena_impl.ApproximateMemoryUsage(), bytes * 1.10); } } for (unsigned int i = 0; i < allocated.size(); i++) { diff --git a/util/options.cc b/util/options.cc index c2c7e15eb..089816bf3 100644 --- a/util/options.cc +++ b/util/options.cc @@ -12,6 +12,7 @@ #include "leveldb/env.h" #include "leveldb/filter_policy.h" #include "leveldb/merge_operator.h" +#include "db/skiplistrep.h" namespace leveldb { @@ -60,6 +61,7 @@ Options::Options() max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), + arena_block_size(0), disable_auto_compactions(false), WAL_ttl_seconds(0), manifest_preallocation_size(4 * 1024 * 1024), @@ -76,7 +78,9 @@ Options::Options() use_adaptive_mutex(false), bytes_per_sync(0), compaction_style(kCompactionStyleLevel), - deletes_check_filter_first(false) { + filter_deletes(false), + memtable_factory(std::shared_ptr(new SkipListFactory)) { + assert(memtable_factory.get() != nullptr); } static const char* const access_hints[] = { @@ -172,6 +176,8 @@ Options::Dump(Logger* log) const no_block_cache); Log(log," Options.table_cache_numshardbits: %d", table_cache_numshardbits); + Log(log," Options.arena_block_size: %ld", + arena_block_size); Log(log," Options.delete_obsolete_files_period_micros: %ld", delete_obsolete_files_period_micros); Log(log," Options.max_background_compactions: %d", @@ -210,10 +216,10 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); + Log(log," Options.filter_deletes: %d", + filter_deletes); Log(log," Options.compaction_style: %d", compaction_style); - Log(log," Options.deletes_check_filter_first: %d", - deletes_check_filter_first); Log(log," Options.compaction_options_universal.size_ratio: %d", compaction_options_universal.size_ratio); Log(log," Options.compaction_options_universal.min_merge_width: %d", diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index eff675340..a4a7134de 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -21,6 +21,10 @@ DBWithTTL::DBWithTTL(const int32_t ttl, assert(options.compaction_filter == nullptr); Options options_to_open = options; options_to_open.compaction_filter = this; + if (options.merge_operator) { + ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); + options_to_open.merge_operator = ttl_merge_op_.get(); + } if (read_only) { st = DB::OpenForReadOnly(options_to_open, dbname, &db_); } else { @@ -125,15 +129,12 @@ Status DBWithTTL::StripTS(std::string* str) { } Status DBWithTTL::Put( - const WriteOptions& o, + const WriteOptions& opt, const Slice& key, const Slice& val) { - std::string value_with_ts; - Status st = AppendTS(val, value_with_ts); - if (!st.ok()) { - return st; - } - return db_->Put(o, key, value_with_ts); + WriteBatch batch; + batch.Put(key, val); + return Write(opt, &batch); } Status DBWithTTL::Get(const ReadOptions& options, @@ -158,18 +159,23 @@ std::vector DBWithTTL::MultiGet(const ReadOptions& options, supported with TTL")); } -bool DBWithTTL::KeyMayExist(const Slice& key) { - return db_->KeyMayExist(key); +bool DBWithTTL::KeyMayExist(ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found) { + return db_->KeyMayExist(options, key, value, value_found); } Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { return db_->Delete(wopts, key); } -Status DBWithTTL::Merge(const WriteOptions& options, +Status DBWithTTL::Merge(const WriteOptions& opt, const Slice& key, const Slice& value) { - return Status::NotSupported("Merge operation not supported."); + WriteBatch batch; + batch.Merge(key, value); + return Write(opt, &batch); } Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { @@ -187,8 +193,13 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { } } virtual void Merge(const Slice& key, const Slice& value) { - // TTL doesn't support merge operation - batch_rewrite_status = Status::NotSupported("TTL doesn't support Merge"); + std::string value_with_ts; + Status st = AppendTS(value, value_with_ts); + if (!st.ok()) { + batch_rewrite_status = st; + } else { + updates_ttl.Merge(key, value_with_ts); + } } virtual void Delete(const Slice& key) { updates_ttl.Delete(key); @@ -223,8 +234,9 @@ void DBWithTTL::GetApproximateSizes(const Range* r, int n, uint64_t* sizes) { db_->GetApproximateSizes(r, n, sizes); } -void DBWithTTL::CompactRange(const Slice* begin, const Slice* end) { - db_->CompactRange(begin, end); +void DBWithTTL::CompactRange(const Slice* begin, const Slice* end, + bool reduce_level) { + db_->CompactRange(begin, end, reduce_level); } int DBWithTTL::NumberLevels() { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index d24efbe48..3b8ba8e95 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -5,8 +5,10 @@ #ifndef LEVELDB_UTILITIES_TTL_DB_TTL_H_ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_ -#include "include/leveldb/db.h" -#include "include/leveldb/compaction_filter.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/compaction_filter.h" +#include "leveldb/merge_operator.h" #include "db/db_impl.h" namespace leveldb { @@ -33,7 +35,10 @@ class DBWithTTL : public DB, CompactionFilter { const std::vector& keys, std::vector* values); - virtual bool KeyMayExist(const Slice& key); + virtual bool KeyMayExist(ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr); virtual Status Delete(const WriteOptions& wopts, const Slice& key); @@ -54,7 +59,8 @@ class DBWithTTL : public DB, CompactionFilter { virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false); virtual int NumberLevels(); @@ -106,6 +112,7 @@ class DBWithTTL : public DB, CompactionFilter { private: DB* db_; int32_t ttl_; + unique_ptr ttl_merge_op_; }; class TtlIterator : public Iterator { @@ -169,5 +176,56 @@ class TtlIterator : public Iterator { Iterator* iter_; }; +class TtlMergeOperator : public MergeOperator { + + public: + explicit TtlMergeOperator(const MergeOperator* merge_op) + : user_merge_op_(merge_op) { + assert(merge_op); + } + + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const { + const uint32_t& ts_len = DBWithTTL::kTSLength; + if ((existing_value && existing_value->size() < ts_len) || + value.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp correctly from value."); + assert(false); + //TODO: Remove assert and make this function return false. + //TODO: Change Merge semantics and add a counter here + } + Slice value_without_ts(value.data(), value.size() - ts_len); + if (existing_value) { + Slice existing_value_without_ts(existing_value->data(), + existing_value->size() - ts_len); + user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts, + new_value, logger); + } else { + user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger); + } + int32_t curtime; + if (!DBWithTTL::GetCurrentTime(curtime).ok()) { + Log(logger, "Error: Could not get current time to be attached internally " + "to the new value."); + assert(false); + //TODO: Remove assert and make this function return false. + } else { + char ts_string[ts_len]; + EncodeFixed32(ts_string, curtime); + new_value->append(ts_string, ts_len); + } + } + + virtual const char* Name() const { + return "Merge By TTL"; + } + + private: + const MergeOperator* user_merge_op_; +}; + } #endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_