diff --git a/Makefile b/Makefile index ae1ee56f2..e6315e926 100644 --- a/Makefile +++ b/Makefile @@ -134,13 +134,12 @@ endif # PLATFORM_SHARED_EXT all: $(LIBRARY) $(PROGRAMS) .PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \ - release tags valgrind_check whitebox_crash_test format + release tags valgrind_check whitebox_crash_test format shared_lib # Will also generate shared libraries. release: $(MAKE) clean OPT="-DNDEBUG -O2" $(MAKE) all -j32 - OPT="-DNDEBUG -O2" $(MAKE) $(SHARED) -j32 coverage: $(MAKE) clean @@ -200,6 +199,8 @@ tags: format: build_tools/format-diff.sh +shared_lib: $(SHARED) + # --------------------------------------------------------------------------- # Unit tests and tools # --------------------------------------------------------------------------- diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index cfa3770d7..9582b6a29 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/compaction_picker.h" + +#include #include "util/statistics.h" namespace rocksdb { @@ -22,6 +24,21 @@ uint64_t TotalFileSize(const std::vector& files) { return sum; } +// Multiple two operands. If they overflow, return op1. +uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { + if (op1 == 0) { + return 0; + } + if (op2 <= 0) { + return op1; + } + uint64_t casted_op2 = (uint64_t) op2; + if (std::numeric_limits::max() / op1 < casted_op2) { + return op1; + } + return op1 * casted_op2; +} + } // anonymous namespace CompactionPicker::CompactionPicker(const Options* options, @@ -30,15 +47,7 @@ CompactionPicker::CompactionPicker(const Options* options, options_(options), num_levels_(options->num_levels), icmp_(icmp) { - Init(); -} - -void CompactionPicker::ReduceNumberOfLevels(int new_levels) { - num_levels_ = new_levels; - Init(); -} -void CompactionPicker::Init() { max_file_size_.reset(new uint64_t[NumberLevels()]); level_max_bytes_.reset(new uint64_t[NumberLevels()]); int target_file_size_multiplier = options_->target_file_size_multiplier; @@ -48,10 +57,11 @@ void CompactionPicker::Init() { max_file_size_[i] = ULLONG_MAX; level_max_bytes_[i] = options_->max_bytes_for_level_base; } else if (i > 1) { - max_file_size_[i] = max_file_size_[i - 1] * target_file_size_multiplier; - level_max_bytes_[i] = - level_max_bytes_[i - 1] * max_bytes_multiplier * - options_->max_bytes_for_level_multiplier_additional[i - 1]; + max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1], + target_file_size_multiplier); + level_max_bytes_[i] = MultiplyCheckOverflow( + MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier), + options_->max_bytes_for_level_multiplier_additional[i - 1]); } else { max_file_size_[i] = options_->target_file_size_base; level_max_bytes_[i] = options_->max_bytes_for_level_base; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 0fe086a18..ee77cc4c7 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -27,9 +27,6 @@ class CompactionPicker { CompactionPicker(const Options* options, const InternalKeyComparator* icmp); virtual ~CompactionPicker(); - // See VersionSet::ReduceNumberOfLevels() - void ReduceNumberOfLevels(int new_levels); - // Pick level and inputs for a new compaction. // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that @@ -120,8 +117,6 @@ class CompactionPicker { const Options* const options_; private: - void Init(); - int num_levels_; const InternalKeyComparator* const icmp_; diff --git a/db/db_bench.cc b/db/db_bench.cc index a8a0a3d6a..e175a5e13 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -21,6 +21,8 @@ #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/write_batch.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" #include "rocksdb/perf_context.h" #include "port/port.h" diff --git a/db/db_impl.cc b/db/db_impl.cc index 8adbf133a..0379c0921 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "db/builder.h" @@ -32,6 +33,7 @@ #include "db/prefix_filter_iterator.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" +#include "db/tailing_iter.h" #include "db/transaction_log_impl.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -265,8 +267,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) bg_cv_(&mutex_), mem_rep_factory_(options_.memtable_factory.get()), mem_(new MemTable(internal_comparator_, options_)), + imm_(options_.min_write_buffer_number_to_merge), logfile_number_(0), super_version_(nullptr), + super_version_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), bg_manual_only_(0), @@ -359,7 +363,7 @@ DBImpl::~DBImpl() { delete mem_->Unref(); } - imm_.UnrefAll(&to_delete); + imm_.current()->Unref(&to_delete); for (MemTable* m: to_delete) { delete m; } @@ -506,7 +510,7 @@ bool DBImpl::SuperVersion::Unref() { void DBImpl::SuperVersion::Cleanup() { assert(refs.load(std::memory_order_relaxed) == 0); - imm.UnrefAll(&to_delete); + imm->Unref(&to_delete); MemTable* m = mem->Unref(); if (m != nullptr) { to_delete.push_back(m); @@ -514,13 +518,13 @@ void DBImpl::SuperVersion::Cleanup() { current->Unref(); } -void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm, +void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current) { mem = new_mem; imm = new_imm; current = new_current; mem->Ref(); - imm.RefAll(); + imm->Ref(); current->Ref(); refs.store(1, std::memory_order_relaxed); } @@ -894,6 +898,11 @@ Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) { return s; } + s = env_->NewDirectory(dbname_, &db_directory_); + if (!s.ok()) { + return s; + } + s = env_->LockFile(LockFileName(dbname_), &db_lock_); if (!s.ok()) { return s; @@ -1187,6 +1196,9 @@ Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, (unsigned long) meta.number, (unsigned long) meta.file_size, s.ToString().c_str()); + if (!options_.disableDataSync) { + db_directory_->Fsync(); + } mutex_.Lock(); } base->Unref(); @@ -1235,7 +1247,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, mutex_.AssertHeld(); assert(imm_.size() != 0); - if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + if (!imm_.IsFlushPending()) { Log(options_.info_log, "FlushMemTableToOutputFile already in progress"); Status s = Status::IOError("FlushMemTableToOutputFile already in progress"); return s; @@ -1280,8 +1292,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( - mems, versions_.get(), s, &mutex_, options_.info_log.get(), - file_number, pending_outputs_, &deletion_state.memtables_to_free); + mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, + pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); if (s.ok()) { InstallSuperVersion(deletion_state); @@ -1302,11 +1314,16 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, return s; } -void DBImpl::CompactRange(const Slice* begin, - const Slice* end, - bool reduce_level, - int target_level) { - FlushMemTable(FlushOptions()); +Status DBImpl::CompactRange(const Slice* begin, + const Slice* end, + bool reduce_level, + int target_level) { + Status s = FlushMemTable(FlushOptions()); + if (!s.ok()) { + LogFlush(options_.info_log); + return s; + } + int max_level_with_files = 1; { MutexLock l(&mutex_); @@ -1322,16 +1339,22 @@ void DBImpl::CompactRange(const Slice* begin, // bottom-most level, the output level will be the same as input one if (options_.compaction_style == kCompactionStyleUniversal || level == max_level_with_files) { - RunManualCompaction(level, level, begin, end); + s = RunManualCompaction(level, level, begin, end); } else { - RunManualCompaction(level, level + 1, begin, end); + s = RunManualCompaction(level, level + 1, begin, end); + } + if (!s.ok()) { + LogFlush(options_.info_log); + return s; } } if (reduce_level) { - ReFitLevel(max_level_with_files, target_level); + s = ReFitLevel(max_level_with_files, target_level); } LogFlush(options_.info_log); + + return s; } // return the same level if it cannot be moved @@ -1350,7 +1373,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) { return minimum_level; } -void DBImpl::ReFitLevel(int level, int target_level) { +Status DBImpl::ReFitLevel(int level, int target_level) { assert(level < NumberLevels()); SuperVersion* superversion_to_free = nullptr; @@ -1363,7 +1386,7 @@ void DBImpl::ReFitLevel(int level, int target_level) { mutex_.Unlock(); Log(options_.info_log, "ReFitLevel: another thread is refitting"); delete new_superversion; - return; + return Status::NotSupported("another thread is refitting"); } refitting_level_ = true; @@ -1384,6 +1407,7 @@ void DBImpl::ReFitLevel(int level, int target_level) { assert(to_level <= level); + Status status; if (to_level < level) { Log(options_.info_log, "Before refitting:\n%s", versions_->current()->DebugString().data()); @@ -1397,7 +1421,7 @@ void DBImpl::ReFitLevel(int level, int target_level) { Log(options_.info_log, "Apply version edit:\n%s", edit.DebugString().data()); - auto status = versions_->LogAndApply(&edit, &mutex_); + status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); superversion_to_free = InstallSuperVersion(new_superversion); new_superversion = nullptr; @@ -1415,6 +1439,7 @@ void DBImpl::ReFitLevel(int level, int target_level) { mutex_.Unlock(); delete superversion_to_free; delete new_superversion; + return status; } int DBImpl::NumberLevels() { @@ -1429,6 +1454,10 @@ int DBImpl::Level0StopWriteTrigger() { return options_.level0_stop_writes_trigger; } +uint64_t DBImpl::CurrentVersionNumber() const { + return super_version_number_.load(); +} + Status DBImpl::Flush(const FlushOptions& options) { return FlushMemTable(options); } @@ -1622,10 +1651,10 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, return status; } -void DBImpl::RunManualCompaction(int input_level, - int output_level, - const Slice* begin, - const Slice* end) { +Status DBImpl::RunManualCompaction(int input_level, + int output_level, + const Slice* begin, + const Slice* end) { assert(input_level >= 0); InternalKey begin_storage, end_storage; @@ -1692,15 +1721,16 @@ void DBImpl::RunManualCompaction(int input_level, assert(!manual.in_progress); assert(bg_manual_only_ > 0); --bg_manual_only_; + return manual.status; } -void DBImpl::TEST_CompactRange(int level, - const Slice* begin, - const Slice* end) { +Status DBImpl::TEST_CompactRange(int level, + const Slice* begin, + const Slice* end) { int output_level = (options_.compaction_style == kCompactionStyleUniversal) ? level : level + 1; - RunManualCompaction(level, output_level, begin, end); + return RunManualCompaction(level, output_level, begin, end); } Status DBImpl::FlushMemTable(const FlushOptions& options) { @@ -1756,8 +1786,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else { - bool is_flush_pending = - imm_.IsFlushPending(options_.min_write_buffer_number_to_merge); + bool is_flush_pending = imm_.IsFlushPending(); if (is_flush_pending && (bg_flush_scheduled_ < options_.max_background_flushes)) { // memtable flush needed @@ -1770,7 +1799,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // max_background_compactions hasn't been reached and, in case // bg_manual_only_ > 0, if it's a manual compaction. if ((manual_compaction_ || - versions_->NeedsCompaction() || + versions_->current()->NeedsCompaction() || (is_flush_pending && (options_.max_background_flushes <= 0))) && bg_compaction_scheduled_ < options_.max_background_compactions && (!bg_manual_only_ || manual_compaction_)) { @@ -1792,8 +1821,7 @@ void DBImpl::BGWorkCompaction(void* db) { Status DBImpl::BackgroundFlush(bool* madeProgress, DeletionState& deletion_state) { Status stat; - while (stat.ok() && - imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + while (stat.ok() && imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", options_.max_background_flushes - bg_flush_scheduled_); @@ -1913,7 +1941,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, mutex_.AssertHeld(); // TODO: remove memtable flush from formal compaction - while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + while (imm_.IsFlushPending()) { Log(options_.info_log, "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots " "available %d", @@ -1964,7 +1992,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); - status = versions_->LogAndApply(c->edit(), &mutex_); + status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(deletion_state); Version::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -1999,6 +2027,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (is_manual) { ManualCompaction* m = manual_compaction_; if (!status.ok()) { + m->status = status; m->done = true; } // For universal compaction: @@ -2211,7 +2240,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->output_level(), out.number, out.file_size, out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } - return versions_->LogAndApply(compact->compaction->edit(), &mutex_); + return versions_->LogAndApply(compact->compaction->edit(), &mutex_, + db_directory_.get()); } // @@ -2318,7 +2348,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, const uint64_t imm_start = env_->NowMicros(); LogFlush(options_.info_log); mutex_.Lock(); - if (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + if (imm_.IsFlushPending()) { FlushMemTableToOutputFile(nullptr, deletion_state); bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary } @@ -2584,6 +2614,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } input.reset(); + if (!options_.disableDataSync) { + db_directory_->Fsync(); + } CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros); @@ -2651,8 +2684,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, namespace { struct IterState { port::Mutex* mu; - Version* version; - autovector mem; // includes both mem_ and imm_ + Version* version = nullptr; + MemTable* mem = nullptr; + MemTableListVersion* imm = nullptr; DBImpl *db; }; @@ -2660,19 +2694,23 @@ static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); DBImpl::DeletionState deletion_state; state->mu->Lock(); - auto mems_size = state->mem.size(); - for (size_t i = 0; i < mems_size; i++) { - MemTable* m = state->mem[i]->Unref(); + if (state->mem) { // not set for immutable iterator + MemTable* m = state->mem->Unref(); if (m != nullptr) { deletion_state.memtables_to_free.push_back(m); } } - if (state->version->Unref()) { - // fast path FindObsoleteFiles - state->db->FindObsoleteFiles(deletion_state, false, true); + if (state->version) { // not set for memtable-only iterator + state->version->Unref(); + } + if (state->imm) { // not set for memtable-only iterator + state->imm->Unref(&deletion_state.memtables_to_free); } + // fast path FindObsoleteFiles + state->db->FindObsoleteFiles(deletion_state, false, true); state->mu->Unlock(); state->db->PurgeObsoleteFiles(deletion_state); + delete state; } } // namespace @@ -2681,7 +2719,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { IterState* cleanup = new IterState; MemTable* mutable_mem; - autovector immutables; + MemTableListVersion* immutable_mems; Version* version; // Collect together all needed child iterators for mem @@ -2690,27 +2728,22 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, mem_->Ref(); mutable_mem = mem_; // Collect together all needed child iterators for imm_ - imm_.GetMemTables(&immutables); - for (unsigned int i = 0; i < immutables.size(); i++) { - immutables[i]->Ref(); - } - // Collect iterators for files in L0 - Ln + immutable_mems = imm_.current(); + immutable_mems->Ref(); versions_->current()->Ref(); version = versions_->current(); mutex_.Unlock(); - std::vector memtables; - memtables.push_back(mutable_mem->NewIterator(options)); - cleanup->mem.push_back(mutable_mem); - for (MemTable* m : immutables) { - memtables.push_back(m->NewIterator(options)); - cleanup->mem.push_back(m); - } - version->AddIterators(options, storage_options_, &memtables); + std::vector iterator_list; + iterator_list.push_back(mutable_mem->NewIterator(options)); + cleanup->mem = mutable_mem; + cleanup->imm = immutable_mems; + // Collect all needed child iterators for immutable memtables + immutable_mems->AddIterators(options, &iterator_list); + // Collect iterators for files in L0 - Ln + version->AddIterators(options, storage_options_, &iterator_list); Iterator* internal_iter = NewMergingIterator( - env_, &internal_comparator_, memtables.data(), memtables.size() - ); - + env_, &internal_comparator_, &iterator_list[0], iterator_list.size()); cleanup->version = version; cleanup->mu = &mutex_; cleanup->db = this; @@ -2724,6 +2757,60 @@ Iterator* DBImpl::TEST_NewInternalIterator() { return NewInternalIterator(ReadOptions(), &ignored); } +std::pair DBImpl::GetTailingIteratorPair( + const ReadOptions& options, + uint64_t* superversion_number) { + + MemTable* mutable_mem; + MemTableListVersion* immutable_mems; + Version* version; + + // get all child iterators and bump their refcounts under lock + mutex_.Lock(); + mutable_mem = mem_; + mutable_mem->Ref(); + immutable_mems = imm_.current(); + immutable_mems->Ref(); + version = versions_->current(); + version->Ref(); + if (superversion_number != nullptr) { + *superversion_number = CurrentVersionNumber(); + } + mutex_.Unlock(); + + Iterator* mutable_iter = mutable_mem->NewIterator(options); + IterState* mutable_cleanup = new IterState(); + mutable_cleanup->mem = mutable_mem; + mutable_cleanup->db = this; + mutable_cleanup->mu = &mutex_; + mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); + + // create a DBIter that only uses memtable content; see NewIterator() + mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), + mutable_iter, kMaxSequenceNumber); + + Iterator* immutable_iter; + IterState* immutable_cleanup = new IterState(); + std::vector list; + immutable_mems->AddIterators(options, &list); + immutable_cleanup->imm = immutable_mems; + version->AddIterators(options, storage_options_, &list); + immutable_cleanup->version = version; + immutable_cleanup->db = this; + immutable_cleanup->mu = &mutex_; + + immutable_iter = + NewMergingIterator(env_, &internal_comparator_, &list[0], list.size()); + immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, + nullptr); + + // create a DBIter that only uses memtable content; see NewIterator() + immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), + immutable_iter, kMaxSequenceNumber); + + return std::make_pair(mutable_iter, immutable_iter); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { MutexLock l(&mutex_); return versions_->current()->MaxNextLevelOverlappingBytes(); @@ -2763,9 +2850,10 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { DBImpl::SuperVersion* DBImpl::InstallSuperVersion( SuperVersion* new_superversion) { mutex_.AssertHeld(); - new_superversion->Init(mem_, imm_, versions_->current()); + new_superversion->Init(mem_, imm_.current(), versions_->current()); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; + ++super_version_number_; if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex @@ -2809,7 +2897,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { @@ -2875,10 +2963,10 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, } MemTable* mem = mem_; - MemTableList imm = imm_; + MemTableListVersion* imm = imm_.current(); Version* current = versions_->current(); mem->Ref(); - imm.RefAll(); + imm->Ref(); current->Ref(); // Unlock while reading from files and memtables @@ -2911,7 +2999,7 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, LookupKey lkey(keys[i], snapshot); if (mem->Get(lkey, value, &s, merge_context, options_)) { // Done - } else if (imm.Get(lkey, value, &s, merge_context, options_)) { + } else if (imm->Get(lkey, value, &s, merge_context, options_)) { // Done } else { current->Get(options, lkey, value, &s, &merge_context, &stats, options_); @@ -2932,7 +3020,7 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, MaybeScheduleFlushOrCompaction(); } MemTable* m = mem->Unref(); - imm.UnrefAll(&to_delete); + imm->Unref(&to_delete); current->Unref(); mutex_.Unlock(); @@ -2967,13 +3055,21 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, } Iterator* DBImpl::NewIterator(const ReadOptions& options) { - SequenceNumber latest_snapshot; - Iterator* iter = NewInternalIterator(options, &latest_snapshot); - iter = NewDBIterator( - &dbname_, env_, options_, user_comparator(), iter, - (options.snapshot != nullptr - ? reinterpret_cast(options.snapshot)->number_ - : latest_snapshot)); + Iterator* iter; + + if (options.tailing) { + iter = new TailingIterator(this, options, user_comparator()); + } else { + SequenceNumber latest_snapshot; + iter = NewInternalIterator(options, &latest_snapshot); + + iter = NewDBIterator( + &dbname_, env_, options_, user_comparator(), iter, + (options.snapshot != nullptr + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot)); + } + if (options.prefix) { // use extra wrapper to exclude any keys from the results which // don't begin with the prefix @@ -3309,12 +3405,11 @@ Status DBImpl::MakeRoomForWrite(bool force, RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); stall_level0_num_files_ += stall; stall_level0_num_files_count_++; - } else if ( - allow_hard_rate_limit_delay && - options_.hard_rate_limit > 1.0 && - (score = versions_->MaxCompactionScore()) > options_.hard_rate_limit) { + } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && + (score = versions_->current()->MaxCompactionScore()) > + options_.hard_rate_limit) { // Delay a write when the compaction score for any level is too large. - int max_level = versions_->MaxCompactionScoreLevel(); + int max_level = versions_->current()->MaxCompactionScoreLevel(); mutex_.Unlock(); uint64_t delayed; { @@ -3336,10 +3431,9 @@ Status DBImpl::MakeRoomForWrite(bool force, allow_hard_rate_limit_delay = false; } mutex_.Lock(); - } else if ( - allow_soft_rate_limit_delay && - options_.soft_rate_limit > 0.0 && - (score = versions_->MaxCompactionScore()) > options_.soft_rate_limit) { + } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 && + (score = versions_->current()->MaxCompactionScore()) > + options_.soft_rate_limit) { // Delay a write when the compaction score for any level is too large. // TODO: add statistics mutex_.Unlock(); @@ -3494,8 +3588,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n" - "--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) RW-Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count msComp msStall Ln-stall Stall-cnt\n" + "------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < current->NumberLevels(); level++) { @@ -3515,9 +3609,21 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { total_bytes_read += bytes_read; total_bytes_written += stats_[level].bytes_written; + uint64_t stalls = level == 0 ? + (stall_level0_slowdown_count_ + + stall_level0_num_files_count_ + + stall_memtable_compaction_count_) : + stall_leveln_slowdown_count_[level]; + + double stall_us = level == 0 ? + (stall_level0_slowdown_ + + stall_level0_num_files_ + + stall_memtable_compaction_) : + stall_leveln_slowdown_[level]; + snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n", + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f %9lu\n", level, files, current->NumLevelBytes(level) / 1048576.0, @@ -3539,8 +3645,13 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, stats_[level].count, - stall_leveln_slowdown_[level] / 1000000.0, - (unsigned long) stall_leveln_slowdown_count_[level]); + (int) ((double) stats_[level].micros / + 1000.0 / + (stats_[level].count + 1)), + (double) stall_us / 1000.0 / (stalls + 1), + stall_us / 1000000.0, + (unsigned long) stalls); + total_slowdown += stall_leveln_slowdown_[level]; total_slowdown_count += stall_leveln_slowdown_count_[level]; value->append(buf); @@ -3788,7 +3899,7 @@ Status DBImpl::DeleteFile(std::string name) { } } edit.DeleteFile(level, number); - status = versions_->LogAndApply(&edit, &mutex_); + status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); if (status.ok()) { InstallSuperVersion(deletion_state); } @@ -3896,7 +4007,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); - s = impl->versions_->LogAndApply(&edit, &impl->mutex_); + s = impl->versions_->LogAndApply(&edit, &impl->mutex_, + impl->db_directory_.get()); } if (s.ok()) { delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); @@ -3904,6 +4016,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->DeleteObsoleteFiles(); impl->MaybeScheduleFlushOrCompaction(); impl->MaybeScheduleLogDBDeployStats(); + s = impl->db_directory_->Fsync(); } } diff --git a/db/db_impl.h b/db/db_impl.h index 3c6010da1..b158e648a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include "db/dbformat.h" @@ -65,8 +66,8 @@ class DBImpl : public DB { virtual void ReleaseSnapshot(const Snapshot* snapshot); virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1); + virtual Status CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false, int target_level = -1); virtual int NumberLevels(); virtual int MaxMemCompactionLevel(); virtual int Level0StopWriteTrigger(); @@ -91,17 +92,17 @@ class DBImpl : public DB { virtual Status GetDbIdentity(std::string& identity); - void RunManualCompaction(int input_level, - int output_level, - const Slice* begin, - const Slice* end); + Status RunManualCompaction(int input_level, + int output_level, + const Slice* begin, + const Slice* end); // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin, *end] - void TEST_CompactRange(int level, - const Slice* begin, - const Slice* end); + Status TEST_CompactRange(int level, + const Slice* begin, + const Slice* end); // Force current memtable contents to be flushed. Status TEST_FlushMemTable(); @@ -141,10 +142,10 @@ class DBImpl : public DB { // holds references to memtable, all immutable memtables and version struct SuperVersion { MemTable* mem; - MemTableList imm; + MemTableListVersion* imm; Version* current; std::atomic refs; - // We need to_delete because during Cleanup(), imm.UnrefAll() returns + // We need to_delete because during Cleanup(), imm->Unref() returns // all memtables that we need to free through this vector. We then // delete all those memtables outside of mutex, during destruction autovector to_delete; @@ -162,7 +163,7 @@ class DBImpl : public DB { // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex void Cleanup(); - void Init(MemTable* new_mem, const MemTableList& new_imm, + void Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); }; @@ -256,6 +257,7 @@ class DBImpl : public DB { private: friend class DB; + friend class TailingIterator; struct CompactionState; struct Writer; @@ -357,7 +359,18 @@ class DBImpl : public DB { // Move the files in the input level to the target level. // If target_level < 0, automatically calculate the minimum level that could // hold the data set. - void ReFitLevel(int level, int target_level = -1); + Status ReFitLevel(int level, int target_level = -1); + + // Returns the current SuperVersion number. + uint64_t CurrentVersionNumber() const; + + // Returns a pair of iterators (mutable-only and immutable-only) used + // internally by TailingIterator and stores CurrentVersionNumber() in + // *superversion_number. These iterators are always up-to-date, i.e. can + // be used to read new data. + std::pair GetTailingIteratorPair( + const ReadOptions& options, + uint64_t* superversion_number); // Constant after construction const InternalFilterPolicy internal_filter_policy_; @@ -381,8 +394,15 @@ class DBImpl : public DB { SuperVersion* super_version_; + // An ordinal representing the current SuperVersion. Updated by + // InstallSuperVersion(), i.e. incremented every time super_version_ + // changes. + std::atomic super_version_number_; + std::string host_name_; + std::unique_ptr db_directory_; + // Queue of writers. std::deque writers_; WriteBatch tmp_batch_; @@ -412,6 +432,7 @@ class DBImpl : public DB { int input_level; int output_level; bool done; + Status status; bool in_progress; // compaction request being processed? const InternalKey* begin; // nullptr means beginning of key range const InternalKey* end; // nullptr means end of key range diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 4beaedd01..57eae0e26 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -49,8 +49,9 @@ public: virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual void CompactRange(const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1) { + virtual Status CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false, int target_level = -1) { + return Status::NotSupported("Not supported operation in read only mode."); } virtual Status DisableFileDeletions() { return Status::NotSupported("Not supported operation in read only mode."); diff --git a/db/db_test.cc b/db/db_test.cc index 2f1d2e6fc..56059371c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -22,15 +22,18 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" #include "rocksdb/plain_table_factory.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/table.h" +#include "table/block_based_table_factory.h" #include "util/hash.h" #include "util/hash_linklist_rep.h" -#include "utilities/merge_operators.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" #include "util/testutil.h" +#include "utilities/merge_operators.h" namespace rocksdb { @@ -838,6 +841,9 @@ TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) { options.filter_policy = filter_policy.get(); options.create_if_missing = true; options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); DestroyAndReopen(&options); ASSERT_OK(db_->Put(WriteOptions(), "key", "val")); @@ -4789,8 +4795,9 @@ class ModelDB: public DB { sizes[i] = 0; } } - virtual void CompactRange(const Slice* start, const Slice* end, - bool reduce_level, int target_level) { + virtual Status CompactRange(const Slice* start, const Slice* end, + bool reduce_level, int target_level) { + return Status::NotSupported("Not supported operation."); } virtual int NumberLevels() @@ -5271,6 +5278,118 @@ void BM_LogAndApply(int iters, int num_base_files) { buf, iters, us, ((float)us) / iters); } +TEST(DBTest, TailingIteratorSingle) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + + // add a record and check that iter can see it + ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "mirko"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + +TEST(DBTest, TailingIteratorKeepAdding) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + std::string value(1024, 'a'); + + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); + + Slice key(buf, 16); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST(DBTest, TailingIteratorDeletes) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + + // write a single record, read it using the iterator, then delete it + ASSERT_OK(db_->Put(WriteOptions(), "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(db_->Delete(WriteOptions(), "0test")); + + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); + + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); + + Slice key(buf, 16); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + } + + // force a flush to make sure that no records are read from memtable + dbfull()->TEST_FlushMemTable(); + + // skip "0test" + iter->Next(); + + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) ; + + ASSERT_EQ(count, num_records); +} + +TEST(DBTest, TailingIteratorPrefixSeek) { + ReadOptions read_options; + read_options.tailing = true; + read_options.prefix_seek = true; + + auto prefix_extractor = NewFixedPrefixTransform(2); + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.prefix_extractor = prefix_extractor; + options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor)); + DestroyAndReopen(&options); + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(db_->Put(WriteOptions(), "0101", "test")); + + dbfull()->TEST_FlushMemTable(); + + ASSERT_OK(db_->Put(WriteOptions(), "0202", "test")); + + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/log_reader.cc b/db/log_reader.cc index 6596cd84f..1dc567413 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -28,6 +28,8 @@ Reader::Reader(unique_ptr&& file, Reporter* reporter, backing_store_(new char[kBlockSize]), buffer_(), eof_(false), + read_error_(false), + eof_offset_(0), last_record_offset_(0), end_of_buffer_offset_(0), initial_offset_(initial_offset) { @@ -170,6 +172,69 @@ uint64_t Reader::LastRecordOffset() { return last_record_offset_; } +void Reader::UnmarkEOF() { + if (read_error_) { + return; + } + + eof_ = false; + + if (eof_offset_ == 0) { + return; + } + + // If the EOF was in the middle of a block (a partial block was read) we have + // to read the rest of the block as ReadPhysicalRecord can only read full + // blocks and expects the file position indicator to be aligned to the start + // of a block. + // + // consumed_bytes + buffer_size() + remaining == kBlockSize + + size_t consumed_bytes = eof_offset_ - buffer_.size(); + size_t remaining = kBlockSize - eof_offset_; + + // backing_store_ is used to concatenate what is left in buffer_ and + // the remainder of the block. If buffer_ already uses backing_store_, + // we just append the new data. + if (buffer_.data() != backing_store_ + consumed_bytes) { + // Buffer_ does not use backing_store_ for storage. + // Copy what is left in buffer_ to backing_store. + memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); + } + + Slice read_buffer; + Status status = file_->Read(remaining, &read_buffer, + backing_store_ + eof_offset_); + + size_t added = read_buffer.size(); + end_of_buffer_offset_ += added; + + if (!status.ok()) { + if (added > 0) { + ReportDrop(added, status); + } + + read_error_ = true; + return; + } + + if (read_buffer.data() != backing_store_ + eof_offset_) { + // Read did not write to backing_store_ + memmove(backing_store_ + eof_offset_, read_buffer.data(), + read_buffer.size()); + } + + buffer_ = Slice(backing_store_ + consumed_bytes, + eof_offset_ + added - consumed_bytes); + + if (added < remaining) { + eof_ = true; + eof_offset_ += added; + } else { + eof_offset_ = 0; + } +} + void Reader::ReportCorruption(size_t bytes, const char* reason) { ReportDrop(bytes, Status::Corruption(reason)); } @@ -184,7 +249,7 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < (size_t)kHeaderSize) { - if (!eof_) { + if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); Status status = file_->Read(kBlockSize, &buffer_, backing_store_); @@ -192,10 +257,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); - eof_ = true; + read_error_ = true; return kEof; } else if (buffer_.size() < (size_t)kBlockSize) { eof_ = true; + eof_offset_ = buffer_.size(); } continue; } else if (buffer_.size() == 0) { diff --git a/db/log_reader.h b/db/log_reader.h index 8e277c821..81d334da2 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -69,9 +69,10 @@ class Reader { // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. - void UnmarkEOF() { - eof_ = false; - } + // Also aligns the file position indicator to the start of the next block + // by reading the rest of the data from the EOF position to the end of the + // block that was partially read. + void UnmarkEOF(); SequentialFile* file() { return file_.get(); } @@ -82,6 +83,11 @@ class Reader { char* const backing_store_; Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize + bool read_error_; // Error occurred while reading from file + + // Offset of the file position indicator within the last block when an + // EOF was detected. + size_t eof_offset_; // Offset of the last record returned by ReadRecord. uint64_t last_record_offset_; diff --git a/db/log_test.cc b/db/log_test.cc index dedbff0aa..636518835 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -47,36 +47,93 @@ class LogTest { public: std::string contents_; + explicit StringDest(Slice& reader_contents) : + WritableFile(), + contents_(""), + reader_contents_(reader_contents), + last_flush_(0) { + reader_contents_ = Slice(contents_.data(), 0); + }; + virtual Status Close() { return Status::OK(); } - virtual Status Flush() { return Status::OK(); } + virtual Status Flush() { + ASSERT_TRUE(reader_contents_.size() <= last_flush_); + size_t offset = last_flush_ - reader_contents_.size(); + reader_contents_ = Slice( + contents_.data() + offset, + contents_.size() - offset); + last_flush_ = contents_.size(); + + return Status::OK(); + } virtual Status Sync() { return Status::OK(); } virtual Status Append(const Slice& slice) { contents_.append(slice.data(), slice.size()); return Status::OK(); } + void Drop(size_t bytes) { + contents_.resize(contents_.size() - bytes); + reader_contents_ = Slice( + reader_contents_.data(), reader_contents_.size() - bytes); + last_flush_ = contents_.size(); + } + + private: + Slice& reader_contents_; + size_t last_flush_; }; class StringSource : public SequentialFile { public: - Slice contents_; + Slice& contents_; bool force_error_; + size_t force_error_position_; + bool force_eof_; + size_t force_eof_position_; bool returned_partial_; - StringSource() : force_error_(false), returned_partial_(false) { } + explicit StringSource(Slice& contents) : + contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false) { } virtual Status Read(size_t n, Slice* result, char* scratch) { ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error"; if (force_error_) { - force_error_ = false; - returned_partial_ = true; - return Status::Corruption("read error"); + if (force_error_position_ >= n) { + force_error_position_ -= n; + } else { + *result = Slice(contents_.data(), force_error_position_); + contents_.remove_prefix(force_error_position_); + force_error_ = false; + returned_partial_ = true; + return Status::Corruption("read error"); + } } if (contents_.size() < n) { n = contents_.size(); returned_partial_ = true; } - *result = Slice(contents_.data(), n); + + if (force_eof_) { + if (force_eof_position_ >= n) { + force_eof_position_ -= n; + } else { + force_eof_ = false; + n = force_eof_position_; + returned_partial_ = true; + } + } + + // By using scratch we ensure that caller has control over the + // lifetime of result.data() + memcpy(scratch, contents_.data(), n); + *result = Slice(scratch, n); + contents_.remove_prefix(n); return Status::OK(); } @@ -123,10 +180,10 @@ class LogTest { src->contents_ = dest_contents(); } + Slice reader_contents_; unique_ptr dest_holder_; unique_ptr source_holder_; ReportCollector report_; - bool reading_; Writer writer_; Reader reader_; @@ -135,16 +192,15 @@ class LogTest { static uint64_t initial_offset_last_record_offsets_[]; public: - LogTest() : dest_holder_(new StringDest), - source_holder_(new StringSource), - reading_(false), + LogTest() : reader_contents_(), + dest_holder_(new StringDest(reader_contents_)), + source_holder_(new StringSource(reader_contents_)), writer_(std::move(dest_holder_)), reader_(std::move(source_holder_), &report_, true/*checksum*/, 0/*initial_offset*/) { } void Write(const std::string& msg) { - ASSERT_TRUE(!reading_) << "Write() after starting to read"; writer_.AddRecord(Slice(msg)); } @@ -153,10 +209,6 @@ class LogTest { } std::string Read() { - if (!reading_) { - reading_ = true; - reset_source_contents(); - } std::string scratch; Slice record; if (reader_.ReadRecord(&record, &scratch)) { @@ -175,7 +227,9 @@ class LogTest { } void ShrinkSize(int bytes) { - dest_contents().resize(dest_contents().size() - bytes); + auto dest = dynamic_cast(writer_.file()); + assert(dest); + dest->Drop(bytes); } void FixChecksum(int header_offset, int len) { @@ -185,9 +239,10 @@ class LogTest { EncodeFixed32(&dest_contents()[header_offset], crc); } - void ForceError() { + void ForceError(size_t position = 0) { auto src = dynamic_cast(reader_.file()); src->force_error_ = true; + src->force_error_position_ = position; } size_t DroppedBytes() const { @@ -198,6 +253,22 @@ class LogTest { return report_.message_; } + void ForceEOF(size_t position = 0) { + auto src = dynamic_cast(reader_.file()); + src->force_eof_ = true; + src->force_eof_position_ = position; + } + + void UnmarkEOF() { + auto src = dynamic_cast(reader_.file()); + src->returned_partial_ = false; + reader_.UnmarkEOF(); + } + + bool IsEOF() { + return reader_.IsEOF(); + } + // Returns OK iff recorded error message contains "msg" std::string MatchError(const std::string& msg) const { if (report_.message_.find(msg) == std::string::npos) { @@ -217,9 +288,7 @@ class LogTest { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, WrittenBytes() + offset_past_end)); @@ -231,9 +300,7 @@ class LogTest { void CheckInitialOffsetRecord(uint64_t initial_offset, int expected_record_offset) { WriteInitialOffsetLog(); - reading_ = true; - unique_ptr source(new StringSource); - source->contents_ = dest_contents(); + unique_ptr source(new StringSource(reader_contents_)); unique_ptr offset_reader( new Reader(std::move(source), &report_, true/*checksum*/, initial_offset)); @@ -520,6 +587,70 @@ TEST(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } +TEST(LogTest, ClearEofSingleBlock) { + Write("foo"); + Write("bar"); + ForceEOF(3 + kHeaderSize + 2); + ASSERT_EQ("foo", Read()); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_TRUE(IsEOF()); + ASSERT_EQ("EOF", Read()); + Write("xxx"); + UnmarkEOF(); + ASSERT_EQ("xxx", Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofMultiBlock) { + size_t num_full_blocks = 5; + size_t n = (kBlockSize - kHeaderSize) * num_full_blocks + 25; + Write(BigString("foo", n)); + Write(BigString("bar", n)); + ForceEOF(n + num_full_blocks * kHeaderSize + 10); + ASSERT_EQ(BigString("foo", n), Read()); + ASSERT_TRUE(IsEOF()); + UnmarkEOF(); + ASSERT_EQ(BigString("bar", n), Read()); + ASSERT_TRUE(IsEOF()); + Write(BigString("xxx", n)); + UnmarkEOF(); + ASSERT_EQ(BigString("xxx", n), Read()); + ASSERT_TRUE(IsEOF()); +} + +TEST(LogTest, ClearEofError) { + // If an error occurs during Read() in UnmarkEOF(), the records contained + // in the buffer should be returned on subsequent calls of ReadRecord() + // until no more full records are left, whereafter ReadRecord() should return + // false to indicate that it cannot read any further. + + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + ASSERT_TRUE(IsEOF()); + Write("xxx"); + ForceError(0); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); +} + +TEST(LogTest, ClearEofError2) { + Write("foo"); + Write("bar"); + UnmarkEOF(); + ASSERT_EQ("foo", Read()); + Write("xxx"); + ForceError(3); + UnmarkEOF(); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ(3U, DroppedBytes()); + ASSERT_EQ("OK", MatchError("read error")); +} + } // namespace log } // namespace rocksdb diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 7197a92ea..4548bd298 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -16,41 +16,85 @@ namespace rocksdb { class InternalKeyComparator; class Mutex; -class MemTableListIterator; class VersionSet; -using std::list; - -// Increase reference count on all underling memtables -void MemTableList::RefAll() { - for (auto &memtable : memlist_) { - memtable->Ref(); +MemTableListVersion::MemTableListVersion(MemTableListVersion* old) { + if (old != nullptr) { + memlist_ = old->memlist_; + size_ = old->size_; + for (auto& m : memlist_) { + m->Ref(); + } } } -// Drop reference count on all underling memtables. If the -// refcount of an underlying memtable drops to zero, then -// return it in to_delete vector. -void MemTableList::UnrefAll(autovector* to_delete) { - for (auto &memtable : memlist_) { - MemTable* m = memtable->Unref(); - if (m != nullptr) { - to_delete->push_back(m); +void MemTableListVersion::Ref() { ++refs_; } + +void MemTableListVersion::Unref(autovector* to_delete) { + assert(refs_ >= 1); + --refs_; + if (refs_ == 0) { + // if to_delete is equal to nullptr it means we're confident + // that refs_ will not be zero + assert(to_delete != nullptr); + for (const auto& m : memlist_) { + MemTable* x = m->Unref(); + if (x != nullptr) { + to_delete->push_back(x); + } } + delete this; } } +int MemTableListVersion::size() const { return size_; } + // Returns the total number of memtables in the list -int MemTableList::size() { - assert(num_flush_not_started_ <= size_); - return size_; +int MemTableList::size() const { + assert(num_flush_not_started_ <= current_->size_); + return current_->size_; +} + +// Search all the memtables starting from the most recent one. +// Return the most recent value found, if any. +// Operands stores the list of merge operations to apply, so far. +bool MemTableListVersion::Get(const LookupKey& key, std::string* value, + Status* s, MergeContext& merge_context, + const Options& options) { + for (auto& memtable : memlist_) { + if (memtable->Get(key, value, s, merge_context, options)) { + return true; + } + } + return false; +} + +void MemTableListVersion::AddIterators(const ReadOptions& options, + std::vector* iterator_list) { + for (auto& m : memlist_) { + iterator_list->push_back(m->NewIterator(options)); + } +} + +// caller is responsible for referencing m +void MemTableListVersion::Add(MemTable* m) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + memlist_.push_front(m); + ++size_; +} + +// caller is responsible for unreferencing m +void MemTableListVersion::Remove(MemTable* m) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + memlist_.remove(m); + --size_; } // Returns true if there is at least one memtable on which flush has // not yet started. -bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { +bool MemTableList::IsFlushPending() { if ((flush_requested_ && num_flush_not_started_ >= 1) || - (num_flush_not_started_ >= min_write_buffer_number_to_merge)) { + (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { assert(imm_flush_needed.NoBarrier_Load() != nullptr); return true; } @@ -59,7 +103,8 @@ bool MemTableList::IsFlushPending(int min_write_buffer_number_to_merge) { // Returns the memtables that need to be flushed. void MemTableList::PickMemtablesToFlush(autovector* ret) { - for (auto it = memlist_.rbegin(); it != memlist_.rend(); it++) { + const auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { MemTable* m = *it; if (!m->flush_in_progress_) { assert(!m->flush_completed_); @@ -67,21 +112,19 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { if (num_flush_not_started_ == 0) { imm_flush_needed.Release_Store(nullptr); } - m->flush_in_progress_ = true; // flushing will start very soon + m->flush_in_progress_ = true; // flushing will start very soon ret->push_back(m); } } - flush_requested_ = false; // start-flush request is complete + flush_requested_ = false; // start-flush request is complete } // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - const autovector &mems, - VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, - uint64_t file_number, - std::set& pending_outputs, - autovector* to_delete) { + const autovector& mems, VersionSet* vset, Status flushStatus, + port::Mutex* mu, Logger* info_log, uint64_t file_number, + std::set& pending_outputs, autovector* to_delete, + Directory* db_directory) { mu->AssertHeld(); // If the flush was not successful, then just reset state. @@ -122,8 +165,8 @@ Status MemTableList::InstallMemtableFlushResults( // scan all memtables from the earliest, and commit those // (in that order) that have finished flushing. Memetables // are always committed in the order that they were created. - while (!memlist_.empty() && s.ok()) { - MemTable* m = memlist_.back(); // get the last element + while (!current_->memlist_.empty() && s.ok()) { + MemTable* m = current_->memlist_.back(); // get the last element if (!m->flush_completed_) { break; } @@ -133,7 +176,11 @@ Status MemTableList::InstallMemtableFlushResults( (unsigned long)m->file_number_); // this can release and reacquire the mutex. - s = vset->LogAndApply(&m->edit_, mu); + s = vset->LogAndApply(&m->edit_, mu, db_directory); + + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. @@ -144,7 +191,7 @@ Status MemTableList::InstallMemtableFlushResults( "Level-0 commit table #%lu: memtable #%lu done", (unsigned long)m->file_number_, (unsigned long)mem_id); - memlist_.remove(m); + current_->Remove(m); assert(m->file_number_ > 0); // pending_outputs can be cleared only after the newly created file @@ -155,7 +202,6 @@ Status MemTableList::InstallMemtableFlushResults( if (m->Unref() != nullptr) { to_delete->push_back(m); } - size_--; } else { //commit failed. setup state so that we can flush again. Log(info_log, @@ -172,7 +218,7 @@ Status MemTableList::InstallMemtableFlushResults( s = Status::IOError("Unable to commit flushed memtable"); } ++mem_id; - } while (!memlist_.empty() && (m = memlist_.back()) && + } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) && m->file_number_ == file_number); } commit_in_progress_ = false; @@ -181,9 +227,14 @@ Status MemTableList::InstallMemtableFlushResults( // New memtables are inserted at the front of the list. void MemTableList::Add(MemTable* m) { - assert(size_ >= num_flush_not_started_); - size_++; - memlist_.push_front(m); + assert(current_->size_ >= num_flush_not_started_); + InstallNewVersion(); + // this method is used to move mutable memtable into an immutable list. + // since mutable memtable is already refcounted by the DBImpl, + // and when moving to the imutable list we don't unref it, + // we don't have to ref the memtable here. we just take over the + // reference from the DBImpl. + current_->Add(m); m->MarkImmutable(); num_flush_not_started_++; if (num_flush_not_started_ == 1) { @@ -194,28 +245,21 @@ void MemTableList::Add(MemTable* m) { // Returns an estimate of the number of bytes of data in use. size_t MemTableList::ApproximateMemoryUsage() { size_t size = 0; - for (auto &memtable : memlist_) { + for (auto& memtable : current_->memlist_) { size += memtable->ApproximateMemoryUsage(); } return size; } -// Search all the memtables starting from the most recent one. -// Return the most recent value found, if any. -// Operands stores the list of merge operations to apply, so far. -bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options) { - for (auto &memtable : memlist_) { - if (memtable->Get(key, value, s, merge_context, options)) { - return true; - } - } - return false; -} - -void MemTableList::GetMemTables(autovector* output) { - for (auto &memtable : memlist_) { - output->push_back(memtable); +void MemTableList::InstallNewVersion() { + if (current_->refs_ == 1) { + // we're the only one using the version, just keep using it + } else { + // somebody else holds the current version, we need to create new one + MemTableListVersion* version = current_; + current_ = new MemTableListVersion(current_); + current_->Ref(); + version->Unref(); } } diff --git a/db/memtable_list.h b/db/memtable_list.h index 9831d7621..01afa5cbe 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -13,62 +13,93 @@ #include "db/memtable.h" #include "db/skiplist.h" #include "rocksdb/db.h" +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" #include "util/autovector.h" namespace rocksdb { class InternalKeyComparator; class Mutex; -class MemTableListIterator; -// +// keeps a list of immutable memtables in a vector. the list is immutable +// if refcount is bigger than one. It is used as a state for Get() and +// Iterator code paths +class MemTableListVersion { + public: + explicit MemTableListVersion(MemTableListVersion* old = nullptr); + + void Ref(); + void Unref(autovector* to_delete = nullptr); + + int size() const; + + // Search all the memtables starting from the most recent one. + // Return the most recent value found, if any. + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext& merge_context, const Options& options); + + void AddIterators(const ReadOptions& options, + std::vector* iterator_list); + + private: + // REQUIRE: m is mutable memtable + void Add(MemTable* m); + // REQUIRE: m is mutable memtable + void Remove(MemTable* m); + + friend class MemTableList; + std::list memlist_; + int size_ = 0; + int refs_ = 0; +}; + // This class stores references to all the immutable memtables. // The memtables are flushed to L0 as soon as possible and in // any order. If there are more than one immutable memtable, their // flushes can occur concurrently. However, they are 'committed' // to the manifest in FIFO order to maintain correctness and // recoverability from a crash. -// class MemTableList { public: // A list of memtables. - MemTableList() : size_(0), num_flush_not_started_(0), - commit_in_progress_(false), - flush_requested_(false) { + explicit MemTableList(int min_write_buffer_number_to_merge) + : min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), + current_(new MemTableListVersion()), + num_flush_not_started_(0), + commit_in_progress_(false), + flush_requested_(false) { imm_flush_needed.Release_Store(nullptr); + current_->Ref(); } - ~MemTableList() {}; + ~MemTableList() {} + + MemTableListVersion* current() { return current_; } // so that backgrund threads can detect non-nullptr pointer to // determine whether this is anything more to start flushing. port::AtomicPointer imm_flush_needed; - // Increase reference count on all underling memtables - void RefAll(); - - // Drop reference count on all underling memtables. If the refcount - // on an underlying memtable drops to zero, then return it in - // to_delete vector. - void UnrefAll(autovector* to_delete); - // Returns the total number of memtables in the list - int size(); + int size() const; // Returns true if there is at least one memtable on which flush has // not yet started. - bool IsFlushPending(int min_write_buffer_number_to_merge); + bool IsFlushPending(); // Returns the earliest memtables that needs to be flushed. The returned // memtables are guaranteed to be in the ascending order of created time. void PickMemtablesToFlush(autovector* mems); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(const autovector &m, - VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, - uint64_t file_number, - std::set& pending_outputs, - autovector* to_delete); + Status InstallMemtableFlushResults(const autovector& m, + VersionSet* vset, Status flushStatus, + port::Mutex* mu, Logger* info_log, + uint64_t file_number, + std::set& pending_outputs, + autovector* to_delete, + Directory* db_directory); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). @@ -77,14 +108,6 @@ class MemTableList { // Returns an estimate of the number of bytes of data in use. size_t ApproximateMemoryUsage(); - // Search all the memtables starting from the most recent one. - // Return the most recent value found, if any. - bool Get(const LookupKey& key, std::string* value, Status* s, - MergeContext& merge_context, const Options& options); - - // Returns the list of underlying memtables. - void GetMemTables(autovector* list); - // Request a flush of all existing memtables to storage void FlushRequested() { flush_requested_ = true; } @@ -93,8 +116,12 @@ class MemTableList { // void operator=(const MemTableList&); private: - std::list memlist_; - int size_; + // DB mutex held + void InstallNewVersion(); + + int min_write_buffer_number_to_merge_; + + MemTableListVersion* current_; // the number of elements that still need flushing int num_flush_not_started_; diff --git a/db/prefix_filter_iterator.h b/db/prefix_filter_iterator.h index f4488379c..e868c7a54 100644 --- a/db/prefix_filter_iterator.h +++ b/db/prefix_filter_iterator.h @@ -12,6 +12,8 @@ #pragma once #include "rocksdb/iterator.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" namespace rocksdb { diff --git a/db/skiplist.h b/db/skiplist.h index 54b4f7446..d6c81688e 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -35,6 +35,7 @@ #include #include "port/port.h" #include "util/random.h" +#include "rocksdb/arena.h" namespace rocksdb { diff --git a/db/tailing_iter.cc b/db/tailing_iter.cc new file mode 100644 index 000000000..5644b1211 --- /dev/null +++ b/db/tailing_iter.cc @@ -0,0 +1,175 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/tailing_iter.h" + +#include +#include +#include "db/db_impl.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" + +namespace rocksdb { + +TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options, + const Comparator* comparator) + : db_(db), options_(options), comparator_(comparator), + version_number_(0), current_(nullptr), + status_(Status::InvalidArgument("Seek() not called on this iterator")) {} + +bool TailingIterator::Valid() const { + return current_ != nullptr; +} + +void TailingIterator::SeekToFirst() { + if (!IsCurrentVersion()) { + CreateIterators(); + } + + mutable_->SeekToFirst(); + immutable_->SeekToFirst(); + UpdateCurrent(); +} + +void TailingIterator::Seek(const Slice& target) { + if (!IsCurrentVersion()) { + CreateIterators(); + } + + mutable_->Seek(target); + + // We maintain the interval (prev_key_, immutable_->key()] such that there + // are no records with keys within that range in immutable_ other than + // immutable_->key(). Since immutable_ can't change in this version, we don't + // need to do a seek if 'target' belongs to that interval (i.e. immutable_ is + // already at the correct position)! + // + // If options.prefix_seek is used and immutable_ is not valid, seek if target + // has a different prefix than prev_key. + // + // prev_key_ is updated by Next(). SeekImmutable() sets prev_key_ to + // 'target' -- in this case, prev_key_ is included in the interval, so + // prev_inclusive_ has to be set. + + if (!is_prev_set_ || + comparator_->Compare(prev_key_, target) >= !is_prev_inclusive_ || + (immutable_->Valid() && + comparator_->Compare(target, immutable_->key()) > 0) || + (options_.prefix_seek && !IsSamePrefix(target))) { + SeekImmutable(target); + } + + UpdateCurrent(); +} + +void TailingIterator::Next() { + assert(Valid()); + + if (!IsCurrentVersion()) { + // save the current key, create new iterators and then seek + std::string current_key = key().ToString(); + Slice key_slice(current_key.data(), current_key.size()); + + CreateIterators(); + Seek(key_slice); + + if (!Valid() || key().compare(key_slice) != 0) { + // record with current_key no longer exists + return; + } + + } else if (current_ == immutable_.get()) { + // immutable iterator is advanced -- update prev_key_ + prev_key_ = key().ToString(); + is_prev_inclusive_ = false; + is_prev_set_ = true; + } + + current_->Next(); + UpdateCurrent(); +} + +Slice TailingIterator::key() const { + assert(Valid()); + return current_->key(); +} + +Slice TailingIterator::value() const { + assert(Valid()); + return current_->value(); +} + +Status TailingIterator::status() const { + if (!status_.ok()) { + return status_; + } else if (!mutable_->status().ok()) { + return mutable_->status(); + } else { + return immutable_->status(); + } +} + +void TailingIterator::Prev() { + status_ = Status::NotSupported("This iterator doesn't support Prev()"); +} + +void TailingIterator::SeekToLast() { + status_ = Status::NotSupported("This iterator doesn't support SeekToLast()"); +} + +void TailingIterator::CreateIterators() { + std::pair iters = + db_->GetTailingIteratorPair(options_, &version_number_); + + assert(iters.first && iters.second); + + mutable_.reset(iters.first); + immutable_.reset(iters.second); + current_ = nullptr; + is_prev_set_ = false; +} + +void TailingIterator::UpdateCurrent() { + current_ = nullptr; + + if (mutable_->Valid()) { + current_ = mutable_.get(); + } + if (immutable_->Valid() && + (current_ == nullptr || + comparator_->Compare(immutable_->key(), current_->key()) < 0)) { + current_ = immutable_.get(); + } + + if (!status_.ok()) { + // reset status that was set by Prev() or SeekToLast() + status_ = Status::OK(); + } +} + +bool TailingIterator::IsCurrentVersion() const { + return mutable_ != nullptr && immutable_ != nullptr && + version_number_ == db_->CurrentVersionNumber(); +} + +bool TailingIterator::IsSamePrefix(const Slice& target) const { + const SliceTransform* extractor = db_->options_.prefix_extractor; + + assert(extractor); + assert(is_prev_set_); + + return extractor->Transform(target) + .compare(extractor->Transform(prev_key_)) == 0; +} + +void TailingIterator::SeekImmutable(const Slice& target) { + prev_key_ = target.ToString(); + is_prev_inclusive_ = true; + is_prev_set_ = true; + + immutable_->Seek(target); +} + +} // namespace rocksdb diff --git a/db/tailing_iter.h b/db/tailing_iter.h new file mode 100644 index 000000000..3b8343a28 --- /dev/null +++ b/db/tailing_iter.h @@ -0,0 +1,88 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" + +namespace rocksdb { + +class DBImpl; + +/** + * TailingIterator is a special type of iterator that doesn't use an (implicit) + * snapshot. In other words, it can be used to read data that was added to the + * db after the iterator had been created. + * + * TailingIterator is optimized for sequential reading. It doesn't support + * Prev() and SeekToLast() operations. + */ +class TailingIterator : public Iterator { + public: + TailingIterator(DBImpl* db, const ReadOptions& options, + const Comparator* comparator); + virtual ~TailingIterator() {} + + virtual bool Valid() const override; + virtual void SeekToFirst() override; + virtual void SeekToLast() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual void Prev() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + + private: + DBImpl* const db_; + const ReadOptions options_; + const Comparator* const comparator_; + uint64_t version_number_; + + // TailingIterator merges the contents of the two iterators below (one using + // mutable memtable contents only, other over SSTs and immutable memtables). + // See DBIter::GetTailingIteratorPair(). + std::unique_ptr mutable_; + std::unique_ptr immutable_; + + // points to either mutable_ or immutable_ + Iterator* current_; + + // key that precedes immutable iterator's current key + std::string prev_key_; + + // unless prev_set is true, prev_key/prev_head is not valid and shouldn't be + // used; reset by createIterators() + bool is_prev_set_; + + // prev_key_ was set by SeekImmutable(), which means that the interval of + // keys covered by immutable_ is [prev_key_, current], i.e. it includes the + // left endpoint + bool is_prev_inclusive_; + + // internal iterator status + Status status_; + + // check if this iterator's version matches DB's version + bool IsCurrentVersion() const; + + // check if SeekImmutable() is needed due to target having a different prefix + // than prev_key_ (used when options.prefix_seek is set) + bool IsSamePrefix(const Slice& target) const; + + // creates mutable_ and immutable_ iterators and updates version_number_ + void CreateIterators(); + + // set current_ to be one of the iterators with the smallest key + void UpdateCurrent(); + + // seek on immutable_ and update prev_key + void SeekImmutable(const Slice& target); +}; + +} // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index 9003636de..8b4a7adaf 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -761,6 +761,28 @@ bool Version::Unref() { return false; } +bool Version::NeedsCompaction() const { + if (file_to_compact_ != nullptr) { + return true; + } + // In universal compaction case, this check doesn't really + // check the compaction condition, but checks num of files threshold + // only. We are not going to miss any compaction opportunity + // but it's likely that more compactions are scheduled but + // ending up with nothing to do. We can improve it later. + // TODO(sdong): improve this function to be accurate for universal + // compactions. + int num_levels_to_check = + (vset_->options_->compaction_style != kCompactionStyleUniversal) ? + NumberLevels() - 1 : 1; + for (int i = 0; i < num_levels_to_check; i++) { + if (compaction_score_[i] >= 1) { + return true; + } + } + return false; +} + bool Version::OverlapInLevel(int level, const Slice* smallest_user_key, const Slice* largest_user_key) { @@ -1418,6 +1440,7 @@ void VersionSet::AppendVersion(Version* v) { } Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, + Directory* db_directory, bool new_descriptor_log) { mu->AssertHeld(); @@ -1546,6 +1569,9 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // of it later env_->DeleteFile(DescriptorFileName(dbname_, old_manifest_file_number)); } + if (!options_->disableDataSync && db_directory != nullptr) { + db_directory->Fsync(); + } } // find offset in manifest file where this version is stored. @@ -1762,6 +1788,78 @@ Status VersionSet::Recover() { return s; } +Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, + const Options* options, + const EnvOptions& storage_options, + int new_levels) { + if (new_levels <= 1) { + return Status::InvalidArgument( + "Number of levels needs to be bigger than 1"); + } + + const InternalKeyComparator cmp(options->comparator); + TableCache tc(dbname, options, storage_options, 10); + VersionSet versions(dbname, options, storage_options, &tc, &cmp); + Status status; + + status = versions.Recover(); + if (!status.ok()) { + return status; + } + + Version* current_version = versions.current(); + int current_levels = current_version->NumberLevels(); + + if (current_levels <= new_levels) { + return Status::OK(); + } + + // Make sure there are file only on one level from + // (new_levels-1) to (current_levels-1) + int first_nonempty_level = -1; + int first_nonempty_level_filenum = 0; + for (int i = new_levels - 1; i < current_levels; i++) { + int file_num = current_version->NumLevelFiles(i); + if (file_num != 0) { + if (first_nonempty_level < 0) { + first_nonempty_level = i; + first_nonempty_level_filenum = file_num; + } else { + char msg[255]; + snprintf(msg, sizeof(msg), + "Found at least two levels containing files: " + "[%d:%d],[%d:%d].\n", + first_nonempty_level, first_nonempty_level_filenum, i, + file_num); + return Status::InvalidArgument(msg); + } + } + } + + std::vector* old_files_list = current_version->files_; + // we need to allocate an array with the old number of levels size to + // avoid SIGSEGV in WriteSnapshot() + // however, all levels bigger or equal to new_levels will be empty + std::vector* new_files_list = + new std::vector[current_levels]; + for (int i = 0; i < new_levels - 1; i++) { + new_files_list[i] = old_files_list[i]; + } + + if (first_nonempty_level > 0) { + new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; + } + + delete[] current_version->files_; + current_version->files_ = new_files_list; + current_version->num_levels_ = new_levels; + + VersionEdit ve; + port::Mutex dummy_mutex; + MutexLock l(&dummy_mutex); + return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true); +} + Status VersionSet::DumpManifest(Options& options, std::string& dscname, bool verbose, bool hex) { struct LogReporter : public log::Reader::Reporter { diff --git a/db/version_set.h b/db/version_set.h index 9470dc1c3..b0aa565ac 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -101,6 +101,15 @@ class Version { // and return true. Otherwise, return false. bool Unref(); + // Returns true iff some level needs a compaction. + bool NeedsCompaction() const; + + // Returns the maxmimum compaction score for levels 1 to max + double MaxCompactionScore() const { return max_compaction_score_; } + + // See field declaration + int MaxCompactionScoreLevel() const { return max_compaction_score_level_; } + void GetOverlappingInputs( int level, const InternalKey* begin, // nullptr means before all keys @@ -277,6 +286,7 @@ class VersionSet { // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() Status LogAndApply(VersionEdit* edit, port::Mutex* mu, + Directory* db_directory = nullptr, bool new_descriptor_log = false); // Recover the last saved descriptor from persistent storage. @@ -285,10 +295,16 @@ class VersionSet { // Try to reduce the number of levels. This call is valid when // only one level from the new max level to the old // max level containing files. + // The call is static, since number of levels is immutable during + // the lifetime of a RocksDB instance. It reduces number of levels + // in a DB by applying changes to manifest. // For example, a db currently has 7 levels [0-6], and a call to // to reduce to 5 [0-4] can only be executed when only one level // among [4-6] contains files. - Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu); + static Status ReduceNumberOfLevels(const std::string& dbname, + const Options* options, + const EnvOptions& storage_options, + int new_levels); // Return the current version. Version* current() const { return current_; } @@ -364,42 +380,6 @@ class VersionSet { // The caller should delete the iterator when no longer needed. Iterator* MakeInputIterator(Compaction* c); - // Returns true iff some level needs a compaction because it has - // exceeded its target size. - bool NeedsSizeCompaction() const { - // In universal compaction case, this check doesn't really - // check the compaction condition, but checks num of files threshold - // only. We are not going to miss any compaction opportunity - // but it's likely that more compactions are scheduled but - // ending up with nothing to do. We can improve it later. - // TODO: improve this function to be accurate for universal - // compactions. - int num_levels_to_check = - (options_->compaction_style != kCompactionStyleUniversal) ? - NumberLevels() - 1 : 1; - for (int i = 0; i < num_levels_to_check; i++) { - if (current_->compaction_score_[i] >= 1) { - return true; - } - } - return false; - } - // Returns true iff some level needs a compaction. - bool NeedsCompaction() const { - return ((current_->file_to_compact_ != nullptr) || - NeedsSizeCompaction()); - } - - // Returns the maxmimum compaction score for levels 1 to max - double MaxCompactionScore() const { - return current_->max_compaction_score_; - } - - // See field declaration - int MaxCompactionScoreLevel() const { - return current_->max_compaction_score_level_; - } - // Add all files listed in any live version to *live. void AddLiveFiles(std::vector* live_list); diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc deleted file mode 100644 index 68b84dab1..000000000 --- a/db/version_set_reduce_num_levels.cc +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -// Copyright (c) 2012 Facebook. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "db/version_set.h" - -#include -#include -#include "db/log_reader.h" -#include "db/log_writer.h" -#include "util/logging.h" - -namespace rocksdb { - -Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { - - if(new_levels <= 1) { - return Status::InvalidArgument( - "Number of levels needs to be bigger than 1"); - } - - Version* current_version = current_; - int current_levels = current_version->NumberLevels(); - - if (current_levels <= new_levels) { - return Status::OK(); - } - - // Make sure there are file only on one level from - // (new_levels-1) to (current_levels-1) - int first_nonempty_level = -1; - int first_nonempty_level_filenum = 0; - for (int i = new_levels - 1; i < current_levels; i++) { - int file_num = current_version->NumLevelFiles(i); - if (file_num != 0) { - if (first_nonempty_level < 0) { - first_nonempty_level = i; - first_nonempty_level_filenum = file_num; - } else { - char msg[255]; - sprintf(msg, "Found at least two levels containing files: " - "[%d:%d],[%d:%d].\n", - first_nonempty_level, first_nonempty_level_filenum, i, file_num); - return Status::InvalidArgument(msg); - } - } - } - - Status st; - std::vector* old_files_list = current_version->files_; - std::vector* new_files_list = - new std::vector[new_levels]; - for (int i = 0; i < new_levels - 1; i++) { - new_files_list[i] = old_files_list[i]; - } - - if (first_nonempty_level > 0) { - new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; - } - - delete[] current_version->files_; - current_version->files_ = new_files_list; - current_version->num_levels_ = new_levels; - - num_levels_ = new_levels; - compaction_picker_->ReduceNumberOfLevels(new_levels); - VersionEdit ve; - st = LogAndApply(&ve, mu, true); - return st; -} - -} diff --git a/hdfs/env_hdfs.h b/hdfs/env_hdfs.h index cb8ca623f..886ccdac3 100644 --- a/hdfs/env_hdfs.h +++ b/hdfs/env_hdfs.h @@ -70,6 +70,9 @@ class HdfsEnv : public Env { unique_ptr* result, const EnvOptions& options); + virtual Status NewDirectory(const std::string& name, + unique_ptr* result); + virtual bool FileExists(const std::string& fname); virtual Status GetChildren(const std::string& path, @@ -246,6 +249,11 @@ class HdfsEnv : public Env { return notsup; } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + return notsup; + } + virtual bool FileExists(const std::string& fname){return false;} virtual Status GetChildren(const std::string& path, diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index 15f1383a6..34d97fa76 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -221,6 +221,11 @@ class WritableFileImpl : public WritableFile { FileState* file_; }; +class InMemoryDirectory : public Directory { + public: + virtual Status Fsync() { return Status::OK(); } +}; + class InMemoryEnv : public EnvWrapper { public: explicit InMemoryEnv(Env* base_env) : EnvWrapper(base_env) { } @@ -274,6 +279,12 @@ class InMemoryEnv : public EnvWrapper { return Status::OK(); } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + result->reset(new InMemoryDirectory()); + return Status::OK(); + } + virtual bool FileExists(const std::string& fname) { MutexLock lock(&mutex_); return file_map_.find(fname) != file_map_.end(); diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index c8542f072..b5821bac2 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -107,6 +107,15 @@ class Cache { // returns the memory size for the entries residing in the cache. virtual size_t GetUsage() const = 0; + // Call this on shutdown if you want to speed it up. Cache will disown + // any underlying data and will not free it on delete. This call will leak + // memory - call this only if you're shutting down the process. + // Any attempts of using cache after this call will fail terribly. + // Always delete the DB object before calling this method! + virtual void DisownData() { + // default implementation is noop + }; + private: void LRU_Remove(Handle* e); void LRU_Append(Handle* e); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 4bf095756..be86a78f6 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -215,9 +215,9 @@ class DB { // hosting all the files. In this case, client could set reduce_level // to true, to move the files back to the minimum level capable of holding // the data set or a given level (specified by non-negative target_level). - virtual void CompactRange(const Slice* begin, const Slice* end, - bool reduce_level = false, - int target_level = -1) = 0; + virtual Status CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false, + int target_level = -1) = 0; // Number of levels used for this DB. virtual int NumberLevels() = 0; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 73acbfabe..06e9b94aa 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -33,6 +33,7 @@ class SequentialFile; class Slice; class WritableFile; class RandomRWFile; +class Directory; struct Options; using std::unique_ptr; @@ -122,6 +123,16 @@ class Env { unique_ptr* result, const EnvOptions& options) = 0; + // Create an object that represents a directory. Will fail if directory + // doesn't exist. If the directory exists, it will open the directory + // and create a new Directory object. + // + // On success, stores a pointer to the new Directory in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) = 0; + // Returns true iff the named file exists. virtual bool FileExists(const std::string& fname) = 0; @@ -488,6 +499,15 @@ class RandomRWFile { void operator=(const RandomRWFile&); }; +// Directory object represents collection of files and implements +// filesystem operations that can be executed on directories. +class Directory { + public: + virtual ~Directory() {} + // Fsync directory + virtual Status Fsync() = 0; +}; + // An interface for writing log messages. class Logger { public: @@ -578,6 +598,10 @@ class EnvWrapper : public Env { const EnvOptions& options) { return target_->NewRandomRWFile(f, r, options); } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + return target_->NewDirectory(name, result); + } bool FileExists(const std::string& f) { return target_->FileExists(f); } Status GetChildren(const std::string& dir, std::vector* r) { return target_->GetChildren(dir, r); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 3cb03c7fc..15906ceed 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -37,12 +37,14 @@ #define STORAGE_ROCKSDB_DB_MEMTABLEREP_H_ #include -#include "rocksdb/arena.h" -#include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" namespace rocksdb { +class Arena; +class Slice; +class SliceTransform; + class MemTableRep { public: // KeyComparator provides a means to compare keys, which are internal keys diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 672c66ed8..219f05630 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -15,11 +15,6 @@ #include #include -#include "rocksdb/memtablerep.h" -#include "rocksdb/slice.h" -#include "rocksdb/slice_transform.h" -#include "rocksdb/statistics.h" -#include "rocksdb/table_properties.h" #include "rocksdb/universal_compaction.h" namespace rocksdb { @@ -34,6 +29,11 @@ class Logger; class MergeOperator; class Snapshot; class TableFactory; +class MemTableRepFactory; +class TablePropertiesCollector; +class Slice; +class SliceTransform; +class Statistics; using std::shared_ptr; @@ -772,20 +772,27 @@ struct ReadOptions { // Default: kReadAllTier ReadTier read_tier; + // Specify to create a tailing iterator -- a special iterator that has a + // view of the complete database (i.e. it can also be used to read newly + // added data) and is optimized for sequential reads. + bool tailing; + ReadOptions() : verify_checksums(false), fill_cache(true), prefix_seek(false), snapshot(nullptr), prefix(nullptr), - read_tier(kReadAllTier) {} + read_tier(kReadAllTier), + tailing(false) {} ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), fill_cache(cache), prefix_seek(false), snapshot(nullptr), prefix(nullptr), - read_tier(kReadAllTier) {} + read_tier(kReadAllTier), + tailing(false) {} }; // Options that control write operations diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index ec862b95f..eaf47e5c7 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -6,14 +6,8 @@ #ifndef STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H #define STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H -#include -#include -#include -#include #include #include -#include "rocksdb/slice.h" -#include "rocksdb/statistics.h" namespace rocksdb { diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 908fe10b7..69a690fc8 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -85,9 +85,9 @@ class StackableDB : public DB { return db_->GetApproximateSizes(r, n, sizes); } - virtual void CompactRange(const Slice* begin, const Slice* end, - bool reduce_level = false, - int target_level = -1) override { + virtual Status CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false, + int target_level = -1) override { return db_->CompactRange(begin, end, reduce_level, target_level); } diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 836f6edf6..a9cd35a68 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -20,10 +20,10 @@ namespace rocksdb { Status BlockBasedTableFactory::GetTableReader( const Options& options, const EnvOptions& soptions, - unique_ptr && file, uint64_t file_size, + unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const { - return BlockBasedTable::Open(options, soptions, std::move(file), file_size, - table_reader); + return BlockBasedTable::Open(options, soptions, table_options_, + std::move(file), file_size, table_reader); } TableBuilder* BlockBasedTableFactory::GetTableBuilder( diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index ee525816f..5a4d1bd6e 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -14,6 +14,7 @@ #include "rocksdb/flush_block_policy.h" #include "rocksdb/options.h" #include "rocksdb/table.h" +#include "table/block_based_table_options.h" namespace rocksdb { @@ -30,40 +31,25 @@ class BlockBasedTable; class BlockBasedTableBuilder; class BlockBasedTableFactory: public TableFactory { -public: - struct TableOptions { - // @flush_block_policy_factory creates the instances of flush block policy. - // which provides a configurable way to determine when to flush a block in - // the block based tables. If not set, table builder will use the default - // block flush policy, which cut blocks by block size (please refer to - // `FlushBlockBySizePolicy`). - std::shared_ptr flush_block_policy_factory; - }; + public: + BlockBasedTableFactory() : BlockBasedTableFactory(BlockBasedTableOptions()) {} + explicit BlockBasedTableFactory(const BlockBasedTableOptions& table_options) + : table_options_(table_options) {} - BlockBasedTableFactory() : BlockBasedTableFactory(TableOptions()) { } - BlockBasedTableFactory(const TableOptions& table_options): - table_options_(table_options) { - } + ~BlockBasedTableFactory() {} - ~BlockBasedTableFactory() { - } - - const char* Name() const override { - return "BlockBasedTable"; - } + const char* Name() const override { return "BlockBasedTable"; } Status GetTableReader(const Options& options, const EnvOptions& soptions, - unique_ptr && file, - uint64_t file_size, + unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const override; TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, - CompressionType compression_type) const - override; + CompressionType compression_type) + const override; private: - TableOptions table_options_; + BlockBasedTableOptions table_options_; }; - } // namespace rocksdb diff --git a/table/block_based_table_options.h b/table/block_based_table_options.h new file mode 100644 index 000000000..f5774e2bf --- /dev/null +++ b/table/block_based_table_options.h @@ -0,0 +1,31 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include + +namespace rocksdb { + +class FlushBlockPolicyFactory; + +struct BlockBasedTableOptions { + // @flush_block_policy_factory creates the instances of flush block policy. + // which provides a configurable way to determine when to flush a block in + // the block based tables. If not set, table builder will use the default + // block flush policy, which cut blocks by block size (please refer to + // `FlushBlockBySizePolicy`). + std::shared_ptr flush_block_policy_factory; + + // TODO(kailiu) Temporarily disable this feature by making the default value + // to be false. Also in master branch, this file is non-public so no user + // will be able to change the value of `cache_index_and_filter_blocks`. + // + // Indicating if we'd put index/filter blocks to the block cache. + // If not specified, each "table reader" object will pre-load index/filter + // block during table initialization. + bool cache_index_and_filter_blocks = false; +}; + +} // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index dd6d0e7ae..8f7470330 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -27,6 +27,7 @@ #include "util/coding.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "table/block_based_table_options.h" namespace rocksdb { @@ -48,9 +49,9 @@ struct BlockBasedTable::Rep { Status status; unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; - size_t cache_key_prefix_size; + size_t cache_key_prefix_size = 0; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; - size_t compressed_cache_key_prefix_size; + size_t compressed_cache_key_prefix_size = 0; // Handle to metaindex_block: saved from footer BlockHandle metaindex_handle; @@ -223,15 +224,15 @@ Cache::Handle* GetFromBlockCache( } // end of anonymous namespace -Status BlockBasedTable::Open(const Options& options, - const EnvOptions& soptions, - unique_ptr && file, - uint64_t size, +Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, + const BlockBasedTableOptions& table_options, + unique_ptr&& file, + uint64_t file_size, unique_ptr* table_reader) { table_reader->reset(); Footer footer(kBlockBasedTableMagicNumber); - auto s = ReadFooterFromFile(file.get(), size, &footer); + auto s = ReadFooterFromFile(file.get(), file_size, &footer); if (!s.ok()) return s; // We've successfully read the footer and the index block: we're @@ -254,13 +255,8 @@ Status BlockBasedTable::Open(const Options& options, if (meta_iter->Valid() && meta_iter->key() == kPropertiesBlock) { s = meta_iter->status(); if (s.ok()) { - s = ReadProperties( - meta_iter->value(), - rep->file.get(), - rep->options.env, - rep->options.info_log.get(), - &rep->table_properties - ); + s = ReadProperties(meta_iter->value(), rep->file.get(), rep->options.env, + rep->options.info_log.get(), &rep->table_properties); } if (!s.ok()) { @@ -271,11 +267,21 @@ Status BlockBasedTable::Open(const Options& options, } } - // Initialize index/filter blocks. If block cache is not specified, - // these blocks will be kept in member variables in Rep, which will - // reside in the memory as long as this table object is alive; otherwise - // they will be added to block cache. - if (!options.block_cache) { + // Will use block cache for index/filter blocks access? + if (options.block_cache && table_options.cache_index_and_filter_blocks) { + // Call IndexBlockReader() to implicitly add index to the block_cache + unique_ptr iter(new_table->IndexBlockReader(ReadOptions())); + s = iter->status(); + + if (s.ok()) { + // Call GetFilter() to implicitly add filter to the block_cache + auto filter_entry = new_table->GetFilter(); + filter_entry.Release(options.block_cache.get()); + } + } else { + // If we don't use block cache for index/filter blocks access, we'll + // pre-load these blocks, which will kept in member variables in Rep + // and with a same life-time as this table object. Block* index_block = nullptr; // TODO: we never really verify check sum for index block s = ReadBlockFromFile( @@ -303,18 +309,7 @@ Status BlockBasedTable::Open(const Options& options, } else { delete index_block; } - } else { - // Call IndexBlockReader() to implicitly add index to the block_cache - unique_ptr iter( - new_table->IndexBlockReader(ReadOptions()) - ); - s = iter->status(); - if (s.ok()) { - // Call GetFilter() to implicitly add filter to the block_cache - auto filter_entry = new_table->GetFilter(); - filter_entry.Release(options.block_cache.get()); - } } if (s.ok()) { @@ -740,7 +735,6 @@ BlockBasedTable::GetFilter(bool no_io) const { // Get the iterator from the index block. Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { if (rep_->index_block) { - assert (!rep_->options.block_cache); return rep_->index_block->NewIterator(rep_->options.comparator); } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 3485a4534..34411f87f 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -29,6 +29,7 @@ struct ReadOptions; class TableCache; class TableReader; class FilterBlockReader; +struct BlockBasedTableOptions; using std::unique_ptr; @@ -49,10 +50,9 @@ class BlockBasedTable : public TableReader { // to nullptr and returns a non-ok status. // // *file must remain live while this Table is in use. - static Status Open(const Options& options, - const EnvOptions& soptions, - unique_ptr&& file, - uint64_t file_size, + static Status Open(const Options& db_options, const EnvOptions& env_options, + const BlockBasedTableOptions& table_options, + unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader); bool PrefixMayMatch(const Slice& internal_prefix) override; diff --git a/table/merger.h b/table/merger.h index 74f46ac9b..ea8daa770 100644 --- a/table/merger.h +++ b/table/merger.h @@ -23,7 +23,8 @@ class Env; // key is present in K child iterators, it will be yielded K times. // // REQUIRES: n >= 0 -extern Iterator* NewMergingIterator( - Env* const env, const Comparator* comparator, Iterator** children, int n); +extern Iterator* NewMergingIterator(Env* const env, + const Comparator* comparator, + Iterator** children, int n); } // namespace rocksdb diff --git a/table/meta_blocks.h b/table/meta_blocks.h index a773c7b38..9f236eff6 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -11,6 +11,7 @@ #include "rocksdb/comparator.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" +#include "rocksdb/table_properties.h" #include "table/block_builder.h" namespace rocksdb { diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index 144c4686a..4e866219e 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -10,6 +10,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/table.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/plain_table_factory.h" namespace rocksdb { @@ -38,7 +39,7 @@ using std::unordered_map; class PlainTableReader: public TableReader { public: static Status Open(const Options& options, const EnvOptions& soptions, - unique_ptr && file, uint64_t file_size, + unique_ptr&& file, uint64_t file_size, unique_ptr* table, const int bloom_num_bits, double hash_table_ratio); @@ -196,7 +197,7 @@ class PlainTableReader: public TableReader { uint32_t& ret_offset); Slice GetPrefix(const Slice& target) { - assert(target.size() >= 8); // target is internal key + assert(target.size() >= 8); // target is internal key return options_.prefix_extractor->Transform( Slice(target.data(), target.size() - 8)); } diff --git a/table/table_test.cc b/table/table_test.cc index 9b4d6d808..4f53ec4da 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -293,14 +293,11 @@ class KeyConvertingIterator: public Iterator { class TableConstructor: public Constructor { public: - explicit TableConstructor( - const Comparator* cmp, bool convert_to_internal_key = false) - : Constructor(cmp), - convert_to_internal_key_(convert_to_internal_key) { - } - ~TableConstructor() { - Reset(); - } + explicit TableConstructor(const Comparator* cmp, + bool convert_to_internal_key = false) + : Constructor(cmp), convert_to_internal_key_(convert_to_internal_key) {} + ~TableConstructor() { Reset(); } + virtual Status FinishImpl(const Options& options, const KVMap& data) { Reset(); sink_.reset(new StringSink()); @@ -329,14 +326,11 @@ class TableConstructor: public Constructor { // Open the table uniq_id_ = cur_uniq_id_++; - source_.reset( - new StringSource(sink_->contents(), uniq_id_, - options.allow_mmap_reads)); - unique_ptr table_factory; - return options.table_factory->GetTableReader(options, soptions, - std::move(source_), - sink_->contents().size(), - &table_reader_); + source_.reset(new StringSource(sink_->contents(), uniq_id_, + options.allow_mmap_reads)); + return options.table_factory->GetTableReader( + options, soptions, std::move(source_), sink_->contents().size(), + &table_reader_); } virtual Iterator* NewIterator() const { @@ -630,7 +624,7 @@ class Harness { internal_comparator_.reset(new InternalKeyComparator(options_.comparator)); support_prev_ = true; only_support_prefix_seek_ = false; - BlockBasedTableFactory::TableOptions table_options; + BlockBasedTableOptions table_options; switch (args.type) { case BLOCK_BASED_TABLE_TEST: table_options.flush_block_policy_factory.reset( @@ -1053,6 +1047,11 @@ TEST(BlockBasedTableTest, BlockCacheTest) { options.create_if_missing = true; options.statistics = CreateDBStatistics(); options.block_cache = NewLRUCache(1024); + + // Enable the cache for index/filter blocks + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + options.table_factory.reset(new BlockBasedTableFactory(table_options)); std::vector keys; KVMap kvmap; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 8321c7eaf..bad2cf0d6 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -31,6 +31,8 @@ #include "utilities/utility_db.h" #include "rocksdb/env.h" #include "rocksdb/write_batch.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" #include "port/port.h" #include "util/coding.h" diff --git a/util/cache.cc b/util/cache.cc index 143c6957a..8f7deaaa8 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -417,6 +417,7 @@ class ShardedLRUCache : public Cache { virtual size_t GetCapacity() const { return capacity_; } + virtual size_t GetUsage() const { // We will not lock the cache when getting the usage from shards. // for (size_t i = 0; i < num_shard_bits_; ++i) @@ -427,6 +428,10 @@ class ShardedLRUCache : public Cache { } return usage; } + + virtual void DisownData() { + shards_ = nullptr; + } }; } // end anonymous namespace diff --git a/util/coding.cc b/util/coding.cc index c858697f8..31ae0e356 100644 --- a/util/coding.cc +++ b/util/coding.cc @@ -8,7 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "util/coding.h" + #include +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" namespace rocksdb { diff --git a/util/coding.h b/util/coding.h index 6dd8cb20f..8ffba51cb 100644 --- a/util/coding.h +++ b/util/coding.h @@ -17,6 +17,8 @@ #include #include #include + +#include "rocksdb/write_batch.h" #include "port/port.h" namespace rocksdb { diff --git a/util/env_hdfs.cc b/util/env_hdfs.cc index 0f8fe0d11..67f0ef797 100644 --- a/util/env_hdfs.cc +++ b/util/env_hdfs.cc @@ -366,6 +366,11 @@ Status HdfsEnv::NewRandomRWFile(const std::string& fname, return Status::NotSupported("NewRandomRWFile not supported on HdfsEnv"); } +virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + return Status::NotSupported("NewDirectory not yet supported on HdfsEnv"); +} + bool HdfsEnv::FileExists(const std::string& fname) { int value = hdfsExists(fileSys_, fname.c_str()); if (value == 0) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 3db0fd62e..9ce455079 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -873,6 +873,24 @@ class PosixRandomRWFile : public RandomRWFile { #endif }; +class PosixDirectory : public Directory { + public: + explicit PosixDirectory(int fd) : fd_(fd) {} + ~PosixDirectory() { + close(fd_); + } + + virtual Status Fsync() { + if (fsync(fd_) == -1) { + return IOError("directory", errno); + } + return Status::OK(); + } + + private: + int fd_; +}; + static int LockOrUnlock(const std::string& fname, int fd, bool lock) { mutex_lockedFiles.Lock(); if (lock) { @@ -1044,6 +1062,18 @@ class PosixEnv : public Env { return s; } + virtual Status NewDirectory(const std::string& name, + unique_ptr* result) { + result->reset(); + const int fd = open(name.c_str(), 0); + if (fd < 0) { + return IOError(name, errno); + } else { + result->reset(new PosixDirectory(fd)); + } + return Status::OK(); + } + virtual bool FileExists(const std::string& fname) { return access(fname.c_str(), F_OK) == 0; } diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 65ecd61a2..80f609cd4 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1069,23 +1069,8 @@ void ReduceDBLevelsCommand::DoCommand() { CloseDB(); EnvOptions soptions; - TableCache tc(db_path_, &opt, soptions, 10); - const InternalKeyComparator cmp(opt.comparator); - VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); - // We rely the VersionSet::Recover to tell us the internal data structures - // in the db. And the Recover() should never do any change (like LogAndApply) - // to the manifest file. - st = versions.Recover(); - if (!st.ok()) { - exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); - return; - } - - port::Mutex mu; - mu.Lock(); - st = versions.ReduceNumberOfLevels(new_levels_, &mu); - mu.Unlock(); + st = VersionSet::ReduceNumberOfLevels(db_path_, &opt, soptions, new_levels_); if (!st.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); return; diff --git a/util/options.cc b/util/options.cc index 918b87281..6f39a2464 100644 --- a/util/options.cc +++ b/util/options.cc @@ -17,6 +17,10 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/merge_operator.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" namespace rocksdb { diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 26bdd254b..6048082d8 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -10,6 +10,7 @@ #include "utilities/backupable_db.h" #include "db/filename.h" #include "util/coding.h" +#include "util/crc32c.h" #include "rocksdb/transaction_log.h" #define __STDC_FORMAT_MACROS @@ -21,6 +22,7 @@ #include #include #include +#include namespace rocksdb { @@ -47,12 +49,22 @@ class BackupEngine { void DeleteBackupsNewerThan(uint64_t sequence_number); private: + struct FileInfo { + FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) + : refs(0), filename(fname), size(sz), checksum_value(checksum) {} + + int refs; + const std::string filename; + const uint64_t size; + uint32_t checksum_value; + }; + class BackupMeta { public: BackupMeta(const std::string& meta_filename, - std::unordered_map* file_refs, Env* env) + std::unordered_map* file_infos, Env* env) : timestamp_(0), size_(0), meta_filename_(meta_filename), - file_refs_(file_refs), env_(env) {} + file_infos_(file_infos), env_(env) {} ~BackupMeta() {} @@ -72,7 +84,8 @@ class BackupEngine { return sequence_number_; } - void AddFile(const std::string& filename, uint64_t size); + Status AddFile(const FileInfo& file_info); + void Delete(); bool Empty() { @@ -95,7 +108,7 @@ class BackupEngine { std::string const meta_filename_; // files with relative paths (without "/" prefix!!) std::vector files_; - std::unordered_map* file_refs_; + std::unordered_map* file_infos_; Env* env_; static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB @@ -140,6 +153,7 @@ class BackupEngine { Env* dst_env, bool sync, uint64_t* size = nullptr, + uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); // if size_limit == 0, there is no size limit, copy everything Status BackupFile(BackupID backup_id, @@ -148,15 +162,21 @@ class BackupEngine { const std::string& src_dir, const std::string& src_fname, // starts with "/" uint64_t size_limit = 0); + + Status CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value); + // Will delete all the files we don't need anymore // If full_scan == true, it will do the full scan of files/ directory - // and delete all the files that are not referenced from backuped_file_refs_ + // and delete all the files that are not referenced from backuped_file_infos__ void GarbageCollection(bool full_scan); // backup state data BackupID latest_backup_id_; std::map backups_; - std::unordered_map backuped_file_refs_; + std::unordered_map backuped_file_infos_; std::vector obsolete_backups_; std::atomic stop_backup_; @@ -197,7 +217,7 @@ BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options) assert(backups_.find(backup_id) == backups_.end()); backups_.insert(std::make_pair( backup_id, BackupMeta(GetBackupMetaFile(backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); } if (options_.destroy_old_data) { // Destory old data @@ -301,7 +321,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) { assert(backups_.find(new_backup_id) == backups_.end()); auto ret = backups_.insert(std::make_pair( new_backup_id, BackupMeta(GetBackupMetaFile(new_backup_id), - &backuped_file_refs_, backup_env_))); + &backuped_file_infos_, backup_env_))); assert(ret.second == true); auto& new_backup = ret.first->second; new_backup.RecordTimestamp(); @@ -477,10 +497,19 @@ Status BackupEngine::RestoreDBFromBackup(BackupID backup_id, "/" + dst; Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); - s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false); + uint32_t checksum_value; + s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, + nullptr /* size */, &checksum_value); if (!s.ok()) { break; } + + const auto iter = backuped_file_infos_.find(file); + assert(iter != backuped_file_infos_.end()); + if (iter->second.checksum_value != checksum_value) { + s = Status::Corruption("Checksum check failed"); + break; + } } Log(options_.info_log, "Restoring done -- %s\n", s.ToString().c_str()); @@ -554,6 +583,7 @@ Status BackupEngine::CopyFile(const std::string& src, Env* dst_env, bool sync, uint64_t* size, + uint32_t* checksum_value, uint64_t size_limit) { Status s; unique_ptr dst_file; @@ -563,6 +593,9 @@ Status BackupEngine::CopyFile(const std::string& src, if (size != nullptr) { *size = 0; } + if (checksum_value != nullptr) { + *checksum_value = 0; + } // Check if size limit is set. if not, set it to very big number if (size_limit == 0) { @@ -588,12 +621,19 @@ Status BackupEngine::CopyFile(const std::string& src, copy_file_buffer_size_ : size_limit; s = src_file->Read(buffer_to_read, &data, buf.get()); size_limit -= data.size(); + + if (!s.ok()) { + return s; + } + if (size != nullptr) { *size += data.size(); } - if (s.ok()) { - s = dst_file->Append(data); + if (checksum_value != nullptr) { + *checksum_value = crc32c::Extend(*checksum_value, data.data(), + data.size()); } + s = dst_file->Append(data); } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -628,9 +668,15 @@ Status BackupEngine::BackupFile(BackupID backup_id, // if it's shared, we also need to check if it exists -- if it does, // no need to copy it again + uint32_t checksum_value = 0; if (shared && backup_env_->FileExists(dst_path)) { backup_env_->GetFileSize(dst_path, &size); // Ignore error - Log(options_.info_log, "%s already present", src_fname.c_str()); + Log(options_.info_log, "%s already present, calculate checksum", + src_fname.c_str()); + s = CalculateChecksum(src_dir + src_fname, + db_env_, + size_limit, + &checksum_value); } else { Log(options_.info_log, "Copying %s", src_fname.c_str()); s = CopyFile(src_dir + src_fname, @@ -639,22 +685,63 @@ Status BackupEngine::BackupFile(BackupID backup_id, backup_env_, options_.sync, &size, + &checksum_value, size_limit); if (s.ok() && shared) { s = backup_env_->RenameFile(dst_path_tmp, dst_path); } } if (s.ok()) { - backup->AddFile(dst_relative, size); + s = backup->AddFile(FileInfo(dst_relative, size, checksum_value)); + } + return s; +} + +Status BackupEngine::CalculateChecksum(const std::string& src, + Env* src_env, + uint64_t size_limit, + uint32_t* checksum_value) { + *checksum_value = 0; + if (size_limit == 0) { + size_limit = std::numeric_limits::max(); + } + + EnvOptions env_options; + env_options.use_mmap_writes = false; + + std::unique_ptr src_file; + Status s = src_env->NewSequentialFile(src, &src_file, env_options); + if (!s.ok()) { + return s; } + + std::unique_ptr buf(new char[copy_file_buffer_size_]); + Slice data; + + do { + if (stop_backup_.load(std::memory_order_acquire)) { + return Status::Incomplete("Backup stopped"); + } + size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? + copy_file_buffer_size_ : size_limit; + s = src_file->Read(buffer_to_read, &data, buf.get()); + + if (!s.ok()) { + return s; + } + + size_limit -= data.size(); + *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size()); + } while (data.size() > 0 && size_limit > 0); + return s; } void BackupEngine::GarbageCollection(bool full_scan) { Log(options_.info_log, "Starting garbage collection"); std::vector to_delete; - for (auto& itr : backuped_file_refs_) { - if (itr.second == 0) { + for (auto& itr : backuped_file_infos_) { + if (itr.second.refs == 0) { Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first)); Log(options_.info_log, "Deleting %s -- %s", itr.first.c_str(), s.ToString().c_str()); @@ -662,7 +749,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { } } for (auto& td : to_delete) { - backuped_file_refs_.erase(td); + backuped_file_infos_.erase(td); } if (!full_scan) { // take care of private dirs -- if full_scan == true, then full_scan will @@ -685,7 +772,7 @@ void BackupEngine::GarbageCollection(bool full_scan) { for (auto& child : shared_children) { std::string rel_fname = GetSharedFileRel(child); // if it's not refcounted, delete it - if (backuped_file_refs_.find(rel_fname) == backuped_file_refs_.end()) { + if (backuped_file_infos_.find(rel_fname) == backuped_file_infos_.end()) { // this might be a directory, but DeleteFile will just fail in that // case, so we're good Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname)); @@ -730,23 +817,34 @@ void BackupEngine::GarbageCollection(bool full_scan) { // ------- BackupMeta class -------- -void BackupEngine::BackupMeta::AddFile(const std::string& filename, - uint64_t size) { - size_ += size; - files_.push_back(filename); - auto itr = file_refs_->find(filename); - if (itr == file_refs_->end()) { - file_refs_->insert(std::make_pair(filename, 1)); +Status BackupEngine::BackupMeta::AddFile(const FileInfo& file_info) { + size_ += file_info.size; + files_.push_back(file_info.filename); + + auto itr = file_infos_->find(file_info.filename); + if (itr == file_infos_->end()) { + auto ret = file_infos_->insert({file_info.filename, file_info}); + if (ret.second) { + ret.first->second.refs = 1; + } else { + // if this happens, something is seriously wrong + return Status::Corruption("In memory metadata insertion error"); + } } else { - ++itr->second; // increase refcount if already present + if (itr->second.checksum_value != file_info.checksum_value) { + return Status::Corruption("Checksum mismatch for existing backup file"); + } + ++itr->second.refs; // increase refcount if already present } + + return Status::OK(); } void BackupEngine::BackupMeta::Delete() { - for (auto& file : files_) { - auto itr = file_refs_->find(file); - assert(itr != file_refs_->end()); - --(itr->second); // decrease refcount + for (const auto& file : files_) { + auto itr = file_infos_->find(file); + assert(itr != file_infos_->end()); + --(itr->second.refs); // decrease refcount } files_.clear(); // delete meta file @@ -758,8 +856,8 @@ void BackupEngine::BackupMeta::Delete() { // // // -// -// +// +// // ... // TODO: maybe add checksum? Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { @@ -789,18 +887,40 @@ Status BackupEngine::BackupMeta::LoadFromFile(const std::string& backup_dir) { sscanf(data.data(), "%u%n", &num_files, &bytes_read); data.remove_prefix(bytes_read + 1); // +1 for '\n' - std::vector> files; + std::vector files; for (uint32_t i = 0; s.ok() && i < num_files; ++i) { - std::string filename = GetSliceUntil(&data, '\n').ToString(); + auto line = GetSliceUntil(&data, '\n'); + std::string filename = GetSliceUntil(&line, ' ').ToString(); + uint64_t size; s = env_->GetFileSize(backup_dir + "/" + filename, &size); - files.push_back(std::make_pair(filename, size)); + + if (line.empty()) { + return Status::Corruption("File checksum is missing"); + } + + uint32_t checksum_value = 0; + if (line.starts_with("crc32 ")) { + line.remove_prefix(6); + sscanf(line.data(), "%u", &checksum_value); + if (memcmp(line.data(), std::to_string(checksum_value).c_str(), + line.size() - 1) != 0) { + return Status::Corruption("Invalid checksum value"); + } + } else { + return Status::Corruption("Unknown checksum type"); + } + + files.emplace_back(filename, size, checksum_value); } if (s.ok()) { - for (auto file : files) { - AddFile(file.first, file.second); + for (const auto& file_info : files) { + s = AddFile(file_info); + if (!s.ok()) { + break; + } } } @@ -824,8 +944,13 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) { len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n", sequence_number_); len += snprintf(buf.get() + len, buf_size - len, "%zu\n", files_.size()); - for (size_t i = 0; i < files_.size(); ++i) { - len += snprintf(buf.get() + len, buf_size - len, "%s\n", files_[i].c_str()); + for (const auto& file : files_) { + const auto& iter = file_infos_->find(file); + + assert(iter != file_infos_->end()); + // use crc32 for now, switch to something else if needed + len += snprintf(buf.get() + len, buf_size - len, "%s crc32 %u\n", + file.c_str(), iter->second.checksum_value); } s = backup_meta_file->Append(Slice(buf.get(), (size_t)len)); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index de240558f..c5909f8e7 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -154,7 +154,6 @@ class TestEnv : public EnvWrapper { Status NewSequentialFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { - opened_files_.push_back(f); if (dummy_sequential_file_) { r->reset(new TestEnv::DummySequentialFile()); return Status::OK(); @@ -165,6 +164,7 @@ class TestEnv : public EnvWrapper { Status NewWritableFile(const std::string& f, unique_ptr* r, const EnvOptions& options) { + written_files_.push_back(f); if (limit_written_files_ <= 0) { return Status::IOError("Sorry, can't do this"); } @@ -172,14 +172,14 @@ class TestEnv : public EnvWrapper { return EnvWrapper::NewWritableFile(f, r, options); } - void AssertOpenedFiles(std::vector& should_have_opened) { - sort(should_have_opened.begin(), should_have_opened.end()); - sort(opened_files_.begin(), opened_files_.end()); - ASSERT_TRUE(opened_files_ == should_have_opened); + void AssertWrittenFiles(std::vector& should_have_written) { + sort(should_have_written.begin(), should_have_written.end()); + sort(written_files_.begin(), written_files_.end()); + ASSERT_TRUE(written_files_ == should_have_written); } - void ClearOpenedFiles() { - opened_files_.clear(); + void ClearWrittenFiles() { + written_files_.clear(); } void SetLimitWrittenFiles(uint64_t limit) { @@ -192,7 +192,7 @@ class TestEnv : public EnvWrapper { private: bool dummy_sequential_file_ = false; - std::vector opened_files_; + std::vector written_files_; uint64_t limit_written_files_ = 1000000; }; // TestEnv @@ -239,6 +239,46 @@ class FileManager : public EnvWrapper { return s; } + Status CorruptChecksum(const std::string& fname, bool appear_valid) { + std::string metadata; + Status s = ReadFileToString(this, fname, &metadata); + if (!s.ok()) { + return s; + } + s = DeleteFile(fname); + if (!s.ok()) { + return s; + } + + std::vector positions; + auto pos = metadata.find(" crc32 "); + if (pos == std::string::npos) { + return Status::Corruption("checksum not found"); + } + do { + positions.push_back(pos); + pos = metadata.find(" crc32 ", pos + 6); + } while (pos != std::string::npos); + + pos = positions[rnd_.Next() % positions.size()]; + if (metadata.size() < pos + 7) { + return Status::Corruption("bad CRC32 checksum value"); + } + + if (appear_valid) { + if (metadata[pos + 8] == '\n') { + // single digit value, safe to insert one more digit + metadata.insert(pos + 8, 1, '0'); + } else { + metadata.erase(pos + 8, 1); + } + } else { + metadata[pos + 7] = 'a'; + } + + return WriteToFile(fname, metadata); + } + Status WriteToFile(const std::string& fname, const std::string& data) { unique_ptr file; EnvOptions env_options; @@ -249,6 +289,7 @@ class FileManager : public EnvWrapper { } return file->Append(Slice(data)); } + private: Random rnd_; }; // FileManager @@ -412,30 +453,43 @@ TEST(BackupableDBTest, NoDoubleCopy) { // should write 5 DB files + LATEST_BACKUP + one meta file test_backup_env_->SetLimitWrittenFiles(7); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); test_db_env_->SetLimitWrittenFiles(0); dummy_db_->live_files_ = { "/00010.sst", "/00011.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); - std::vector should_have_openened = dummy_db_->live_files_; - should_have_openened.push_back("/00011.log"); - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + std::vector should_have_written = { + "/shared/00010.sst.tmp", + "/shared/00011.sst.tmp", + "/private/1.tmp/CURRENT", + "/private/1.tmp/MANIFEST-01", + "/private/1.tmp/00011.log", + "/meta/1.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); // should write 4 new DB files + LATEST_BACKUP + one meta file // should not write/copy 00010.sst, since it's already there! test_backup_env_->SetLimitWrittenFiles(6); - test_db_env_->ClearOpenedFiles(); + test_backup_env_->ClearWrittenFiles(); dummy_db_->live_files_ = { "/00010.sst", "/00015.sst", "/CURRENT", "/MANIFEST-01" }; dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; ASSERT_OK(db_->CreateNewBackup(false)); // should not open 00010.sst - it's already there - should_have_openened = { "/00015.sst", "/CURRENT", - "/MANIFEST-01", "/00011.log" }; - AppendPath(dbname_, should_have_openened); - test_db_env_->AssertOpenedFiles(should_have_openened); + should_have_written = { + "/shared/00015.sst.tmp", + "/private/2.tmp/CURRENT", + "/private/2.tmp/MANIFEST-01", + "/private/2.tmp/00011.log", + "/meta/2.tmp", + "/LATEST_BACKUP.tmp" + }; + AppendPath(dbname_ + "_backup", should_have_written); + test_backup_env_->AssertWrittenFiles(should_have_written); ASSERT_OK(db_->DeleteBackup(1)); ASSERT_EQ(true, @@ -463,6 +517,8 @@ TEST(BackupableDBTest, NoDoubleCopy) { // 3. Corrupted backup meta file or missing backuped file - we should // not be able to open that backup, but all other backups should be // fine +// 4. Corrupted checksum value - if the checksum is not a valid uint32_t, +// db open should fail, otherwise, it aborts during the restore process. TEST(BackupableDBTest, CorruptionsTest) { const int keys_iteration = 5000; Random rnd(6); @@ -519,12 +575,29 @@ TEST(BackupableDBTest, CorruptionsTest) { CloseRestoreDB(); ASSERT_TRUE(!s.ok()); - // new backup should be 4! + // --------- case 4. corrupted checksum value ---- + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/3", false)); + // checksum of backup 3 is an invalid value, this can be detected at + // db open time, and it reverts to the previous backup automatically + AssertBackupConsistency(0, 0, keys_iteration * 2, keys_iteration * 5); + // checksum of the backup 2 appears to be valid, this can cause checksum + // mismatch and abort restore process + ASSERT_OK(file_manager_->CorruptChecksum(backupdir_ + "/meta/2", true)); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + OpenRestoreDB(); + ASSERT_TRUE(file_manager_->FileExists(backupdir_ + "/meta/2")); + s = restore_db_->RestoreDBFromBackup(2, dbname_, dbname_); + ASSERT_TRUE(!s.ok()); + ASSERT_OK(restore_db_->DeleteBackup(2)); + CloseRestoreDB(); + AssertBackupConsistency(0, 0, keys_iteration * 1, keys_iteration * 5); + + // new backup should be 2! OpenBackupableDB(); - FillDB(db_.get(), keys_iteration * 3, keys_iteration * 4); + FillDB(db_.get(), keys_iteration * 1, keys_iteration * 2); ASSERT_OK(db_->CreateNewBackup(!!(rnd.Next() % 2))); CloseBackupableDB(); - AssertBackupConsistency(4, 0, keys_iteration * 4, keys_iteration * 5); + AssertBackupConsistency(2, 0, keys_iteration * 2, keys_iteration * 5); } // open DB, write, close DB, backup, restore, repeat