diff --git a/db/column_family_test.cc b/db/column_family_test.cc index ac3435593..b96e66829 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -408,9 +408,15 @@ TEST(ColumnFamilyTest, WriteBatchFailure) { Open(); CreateColumnFamiliesAndReopen({"one", "two"}); WriteBatch batch; + batch.Put(handles_[0], Slice("existing"), Slice("column-family")); batch.Put(handles_[1], Slice("non-existing"), Slice("column-family")); ASSERT_OK(db_->Write(WriteOptions(), &batch)); DropColumnFamilies({1}); + WriteOptions woptions_ignore_missing_cf; + woptions_ignore_missing_cf.ignore_missing_column_families = true; + batch.Put(handles_[0], Slice("still here"), Slice("column-family")); + ASSERT_OK(db_->Write(woptions_ignore_missing_cf, &batch)); + ASSERT_EQ("column-family", Get(0, "still here")); Status s = db_->Write(WriteOptions(), &batch); ASSERT_TRUE(s.IsInvalidArgument()); Close(); diff --git a/db/db_impl.cc b/db/db_impl.cc index a123467ae..7c65e9a61 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -290,8 +290,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { return result; } +namespace { + Status SanitizeDBOptionsByCFOptions( - DBOptions* db_opts, + const DBOptions* db_opts, const std::vector& column_families) { Status s; for (auto cf : column_families) { @@ -303,7 +305,6 @@ Status SanitizeDBOptionsByCFOptions( return Status::OK(); } -namespace { CompressionType GetCompressionFlush(const Options& options) { // Compressing memtable flushes might not help unless the sequential load // optimization is used for leveled compaction. Otherwise the CPU and @@ -631,7 +632,7 @@ bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first, } else if (first.file_name < second.file_name) { return false; } else { - return (first.path_id > first.path_id); + return (first.path_id > second.path_id); } } }; // namespace @@ -1301,14 +1302,20 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, WriteBatch batch; while (reader.ReadRecord(&record, &scratch)) { if (record.size() < 12) { - reporter.Corruption( - record.size(), Status::Corruption("log record too small")); + reporter.Corruption(record.size(), + Status::Corruption("log record too small")); continue; } WriteBatchInternal::SetContents(&batch, record); + // If column family was not found, it might mean that the WAL write + // batch references to the column family that was dropped after the + // insert. We don't want to fail the whole write batch in that case -- we + // just ignore the update. That's why we set ignore missing column families + // to true status = WriteBatchInternal::InsertInto( - &batch, column_family_memtables_.get(), true, log_number); + &batch, column_family_memtables_.get(), + true /* ignore missing column families */, log_number); MaybeIgnoreError(&status); if (!status.ok()) { @@ -1677,6 +1684,13 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, } LogFlush(options_.info_log); + { + MutexLock l(&mutex_); + // an automatic compaction that has been scheduled might have been + // preempted by the manual compactions. Need to schedule it back. + MaybeScheduleFlushOrCompaction(); + } + return s; } @@ -1864,18 +1878,15 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, bg_cv_.Wait(); } else { manual_compaction_ = &manual; - MaybeScheduleFlushOrCompaction(); + assert(bg_compaction_scheduled_ == 0); + bg_compaction_scheduled_++; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); } } assert(!manual.in_progress); assert(bg_manual_only_ > 0); --bg_manual_only_; - if (bg_manual_only_ == 0) { - // an automatic compaction should have been scheduled might have be - // preempted by the manual compactions. Need to schedule it back. - MaybeScheduleFlushOrCompaction(); - } return manual.status; } @@ -1963,11 +1974,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { // Schedule BGWorkCompaction if there's a compaction pending (or a memtable // flush, but the HIGH pool is not enabled) - // Do it only if max_background_compactions hasn't been reached and, in case - // bg_manual_only_ > 0, if it's a manual compaction. - if ((manual_compaction_ || is_compaction_needed || - (is_flush_pending && options_.max_background_flushes == 0)) && - (!bg_manual_only_ || manual_compaction_)) { + // Do it only if max_background_compactions hasn't been reached and + // bg_manual_only_ == 0 + if (!bg_manual_only_ && + (is_compaction_needed || + (is_flush_pending && options_.max_background_flushes == 0))) { if (bg_compaction_scheduled_ < options_.max_background_compactions) { bg_compaction_scheduled_++; env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); @@ -1979,7 +1990,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } void DBImpl::RecordFlushIOStats() { - RecordTick(stats_, FLUSH_WRITE_BYTES, iostats_context.bytes_written); + RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written)); IOSTATS_RESET(bytes_written); } @@ -2194,6 +2205,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (is_manual) { // another thread cannot pick up the same work manual_compaction_->in_progress = true; + } else if (manual_compaction_ != nullptr) { + // there should be no automatic compactions running when manual compaction + // is running + return Status::OK(); } // FLUSH preempts compaction @@ -2313,7 +2328,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (status.ok()) { // Done - } else if (shutting_down_.Acquire_Load()) { + } else if (status.IsShutdownInProgress()) { // Ignore compaction errors found during shutting down } else { Log(InfoLogLevel::WARN_LEVEL, options_.info_log, "Compaction error: %s", @@ -2573,6 +2588,10 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, DeletionState& deletion_state, LogBuffer* log_buffer) { + if (options_.max_background_flushes > 0) { + // flush thread will take care of this + return 0; + } if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); @@ -2626,9 +2645,29 @@ Status DBImpl::ProcessKeyValueCompaction( compaction_filter = compaction_filter_from_factory.get(); } + int64_t key_drop_user = 0; + int64_t key_drop_newer_entry = 0; + int64_t key_drop_obsolete = 0; + int64_t loop_cnt = 0; while (input->Valid() && !shutting_down_.Acquire_Load() && !cfd->IsDropped()) { - RecordCompactionIOStats(); + if (++loop_cnt > 1000) { + if (key_drop_user > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); + key_drop_user = 0; + } + if (key_drop_newer_entry > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, + key_drop_newer_entry); + key_drop_newer_entry = 0; + } + if (key_drop_obsolete > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete); + key_drop_obsolete = 0; + } + RecordCompactionIOStats(); + loop_cnt = 0; + } // FLUSH preempts compaction // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on @@ -2709,7 +2748,7 @@ Status DBImpl::ProcessKeyValueCompaction( ParseInternalKey(key, &ikey); // no value associated with delete value.clear(); - RecordTick(stats_, COMPACTION_KEY_DROP_USER); + ++key_drop_user; } else if (value_changed) { value = compaction_filter_value; } @@ -2733,7 +2772,7 @@ Status DBImpl::ProcessKeyValueCompaction( // TODO: why not > ? assert(last_sequence_for_key >= ikey.sequence); drop = true; // (A) - RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY); + ++key_drop_newer_entry; } else if (ikey.type == kTypeDeletion && ikey.sequence <= earliest_snapshot && compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) { @@ -2745,7 +2784,7 @@ Status DBImpl::ProcessKeyValueCompaction( // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; - RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE); + ++key_drop_obsolete; } else if (ikey.type == kTypeMerge) { if (!merge.HasOperator()) { LogToBuffer(log_buffer, "Options::merge_operator is null."); @@ -2892,7 +2931,15 @@ Status DBImpl::ProcessKeyValueCompaction( input->Next(); } } - + if (key_drop_user > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user); + } + if (key_drop_newer_entry > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry); + } + if (key_drop_obsolete > 0) { + RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete); + } RecordCompactionIOStats(); return status; @@ -3367,7 +3414,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { StopWatch sw(env_, stats_, DB_GET); - PERF_TIMER_AUTO(get_snapshot_time); + PERF_TIMER_GUARD(get_snapshot_time); auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); @@ -3391,6 +3438,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); PERF_TIMER_STOP(get_snapshot_time); + if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) { // Done RecordTick(stats_, MEMTABLE_HIT); @@ -3398,20 +3446,19 @@ Status DBImpl::GetImpl(const ReadOptions& options, // Done RecordTick(stats_, MEMTABLE_HIT); } else { - PERF_TIMER_START(get_from_output_files_time); - + PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(options, lkey, value, &s, &merge_context, value_found); - PERF_TIMER_STOP(get_from_output_files_time); RecordTick(stats_, MEMTABLE_MISS); } - PERF_TIMER_START(get_post_process_time); + { + PERF_TIMER_GUARD(get_post_process_time); - ReturnAndCleanupSuperVersion(cfd, sv); + ReturnAndCleanupSuperVersion(cfd, sv); - RecordTick(stats_, NUMBER_KEYS_READ); - RecordTick(stats_, BYTES_READ, value->size()); - PERF_TIMER_STOP(get_post_process_time); + RecordTick(stats_, NUMBER_KEYS_READ); + RecordTick(stats_, BYTES_READ, value->size()); + } return s; } @@ -3421,7 +3468,7 @@ std::vector DBImpl::MultiGet( const std::vector& keys, std::vector* values) { StopWatch sw(env_, stats_, DB_MULTIGET); - PERF_TIMER_AUTO(get_snapshot_time); + PERF_TIMER_GUARD(get_snapshot_time); SequenceNumber snapshot; @@ -3497,7 +3544,7 @@ std::vector DBImpl::MultiGet( } // Post processing (decrement reference counts and record statistics) - PERF_TIMER_START(get_post_process_time); + PERF_TIMER_GUARD(get_post_process_time); autovector superversions_to_delete; // TODO(icanadi) do we need lock here or just around Cleanup()? @@ -3870,7 +3917,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } - PERF_TIMER_AUTO(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_pre_and_post_process_time); Writer w(&mutex_); w.batch = my_batch; w.sync = options.sync; @@ -4003,7 +4050,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { uint64_t log_size = 0; if (!options.disableWAL) { - PERF_TIMER_START(write_wal_time); + PERF_TIMER_GUARD(write_wal_time); Slice log_entry = WriteBatchInternal::Contents(updates); status = log_->AddRecord(log_entry); total_log_size_ += log_entry.size(); @@ -4021,13 +4068,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { status = log_->file()->Sync(); } } - PERF_TIMER_STOP(write_wal_time); } if (status.ok()) { - PERF_TIMER_START(write_memtable_time); + PERF_TIMER_GUARD(write_memtable_time); status = WriteBatchInternal::InsertInto( - updates, column_family_memtables_.get(), false, 0, this, false); + updates, column_family_memtables_.get(), + options.ignore_missing_column_families, 0, this, false); // A non-OK status here indicates iteration failure (either in-memory // writebatch corruption (very bad), or the client specified invalid // column family). This will later on trigger bg_error_. @@ -4036,8 +4083,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // into the memtable would result in a state that some write ops might // have succeeded in memtable but Status reports error for all writes. - PERF_TIMER_STOP(write_memtable_time); - SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); } PERF_TIMER_START(write_pre_and_post_process_time); @@ -4071,7 +4116,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(stats_, WRITE_TIMEDOUT); } - PERF_TIMER_STOP(write_pre_and_post_process_time); return status; } @@ -4759,11 +4803,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; - Status s = SanitizeDBOptionsByCFOptions(&db_options, column_families); - if (!s.ok()) { - return s; - } - s = DB::Open(db_options, dbname, column_families, &handles, dbptr); + Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a reference to @@ -4776,6 +4816,10 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr) { + Status s = SanitizeDBOptionsByCFOptions(&db_options, column_families); + if (!s.ok()) { + return s; + } if (db_options.db_paths.size() > 1) { for (auto& cfd : column_families) { if (cfd.options.compaction_style != kCompactionStyleUniversal) { @@ -4801,7 +4845,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } DBImpl* impl = new DBImpl(db_options, dbname); - Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir); + s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir); if (s.ok()) { for (auto db_path : impl->options_.db_paths) { s = impl->env_->CreateDirIfMissing(db_path.path); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 47c8ab33d..1dfdf422e 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -74,6 +74,8 @@ class DBImplReadOnly : public DBImpl { uint32_t target_path_id = 0) override { return Status::NotSupported("Not supported operation in read only mode."); } + +#ifndef ROCKSDB_LITE virtual Status DisableFileDeletions() override { return Status::NotSupported("Not supported operation in read only mode."); } @@ -85,6 +87,8 @@ class DBImplReadOnly : public DBImpl { bool flush_memtable = true) override { return Status::NotSupported("Not supported operation in read only mode."); } +#endif // ROCKSDB_LITE + using DBImpl::Flush; virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override { diff --git a/db/db_iter.cc b/db/db_iter.cc index 370ffd8cb..599a56a99 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -194,9 +194,8 @@ void DBIter::Next() { // NOTE: In between, saved_key_ can point to a user key that has // a delete marker inline void DBIter::FindNextUserEntry(bool skipping) { - PERF_TIMER_AUTO(find_next_user_entry_time); + PERF_TIMER_GUARD(find_next_user_entry_time); FindNextUserEntryInternal(skipping); - PERF_TIMER_STOP(find_next_user_entry_time); } // Actual implementation of DBIter::FindNextUserEntry() @@ -557,9 +556,12 @@ void DBIter::Seek(const Slice& target) { saved_key_.Clear(); // now savved_key is used to store internal key. saved_key_.SetInternalKey(target, sequence_); - PERF_TIMER_AUTO(seek_internal_seek_time); - iter_->Seek(saved_key_.GetKey()); - PERF_TIMER_STOP(seek_internal_seek_time); + + { + PERF_TIMER_GUARD(seek_internal_seek_time); + iter_->Seek(saved_key_.GetKey()); + } + if (iter_->Valid()) { direction_ = kForward; ClearSavedValue(); @@ -577,9 +579,12 @@ void DBIter::SeekToFirst() { } direction_ = kForward; ClearSavedValue(); - PERF_TIMER_AUTO(seek_internal_seek_time); - iter_->SeekToFirst(); - PERF_TIMER_STOP(seek_internal_seek_time); + + { + PERF_TIMER_GUARD(seek_internal_seek_time); + iter_->SeekToFirst(); + } + if (iter_->Valid()) { FindNextUserEntry(false /* not skipping */); } else { @@ -595,9 +600,11 @@ void DBIter::SeekToLast() { } direction_ = kReverse; ClearSavedValue(); - PERF_TIMER_AUTO(seek_internal_seek_time); - iter_->SeekToLast(); - PERF_TIMER_STOP(seek_internal_seek_time); + + { + PERF_TIMER_GUARD(seek_internal_seek_time); + iter_->SeekToLast(); + } PrevInternal(); } diff --git a/db/db_test.cc b/db/db_test.cc index f1933059d..6295f5921 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7554,7 +7554,7 @@ TEST(DBTest, SimpleWriteTimeoutTest) { ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); // As the only two write buffers are full in this moment, the third // Put is expected to be timed-out. - write_opt.timeout_hint_us = 300; + write_opt.timeout_hint_us = 50; ASSERT_TRUE( Put(Key(3), Key(3) + std::string(100000, 'v'), write_opt).IsTimedOut()); } diff --git a/db/dbformat.cc b/db/dbformat.cc index e53d16dc1..baeb86802 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -127,26 +127,6 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const { } } -const char* InternalFilterPolicy::Name() const { - return user_policy_->Name(); -} - -void InternalFilterPolicy::CreateFilter(const Slice* keys, int n, - std::string* dst) const { - // We rely on the fact that the code in table.cc does not mind us - // adjusting keys[]. - Slice* mkey = const_cast(keys); - for (int i = 0; i < n; i++) { - mkey[i] = ExtractUserKey(keys[i]); - // TODO(sanjay): Suppress dups? - } - user_policy_->CreateFilter(keys, n, dst); -} - -bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { - return user_policy_->KeyMayMatch(ExtractUserKey(key), f); -} - LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { size_t usize = user_key.size(); size_t needed = usize + 13; // A conservative estimate diff --git a/db/dbformat.h b/db/dbformat.h index 4dab5f196..eb5d8ed53 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -124,19 +124,6 @@ class InternalKeyComparator : public Comparator { int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const; }; -// Filter policy wrapper that converts from internal keys to user keys -class InternalFilterPolicy : public FilterPolicy { - private: - std::shared_ptr shared_ptr_; - const FilterPolicy* const user_policy_; - public: - explicit InternalFilterPolicy(std::shared_ptr p) - : shared_ptr_(p), user_policy_(p.get()) {} - virtual const char* Name() const; - virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const; - virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const; -}; - // Modules in this directory should keep internal keys wrapped inside // the following class instead of plain strings so that we do not // incorrectly use string comparisons instead of an InternalKeyComparator. diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 79cc953cf..74e6dd249 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -6,9 +6,10 @@ #ifndef ROCKSDB_LITE #include "db/forward_iterator.h" +#include #include #include -#include + #include "db/db_impl.h" #include "db/db_iter.h" #include "db/column_family.h" @@ -37,12 +38,16 @@ class LevelIterator : public Iterator { assert(file_index < files_.size()); if (file_index != file_index_) { file_index_ = file_index; - file_iter_.reset(cfd_->table_cache()->NewIterator( - read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), - files_[file_index_]->fd, nullptr /* table_reader_ptr */, false)); + Reset(); } valid_ = false; } + void Reset() { + assert(file_index_ < files_.size()); + file_iter_.reset(cfd_->table_cache()->NewIterator( + read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), + files_[file_index_]->fd, nullptr /* table_reader_ptr */, false)); + } void SeekToLast() override { status_ = Status::NotSupported("LevelIterator::SeekToLast()"); valid_ = false; @@ -63,12 +68,15 @@ class LevelIterator : public Iterator { assert(file_iter_ != nullptr); file_iter_->Seek(internal_key); valid_ = file_iter_->Valid(); - assert(valid_); } void Next() override { assert(valid_); file_iter_->Next(); - while (!file_iter_->Valid()) { + for (;;) { + if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) { + valid_ = !file_iter_->status().IsIncomplete(); + return; + } if (file_index_ + 1 >= files_.size()) { valid_ = false; return; @@ -76,7 +84,6 @@ class LevelIterator : public Iterator { SetFileIndex(file_index_ + 1); file_iter_->SeekToFirst(); } - valid_ = file_iter_->Valid(); } Slice key() const override { assert(valid_); @@ -160,6 +167,8 @@ void ForwardIterator::SeekToFirst() { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(); + } else if (status_.IsIncomplete()) { + ResetIncompleteIterators(); } SeekInternal(Slice(), true); } @@ -168,6 +177,8 @@ void ForwardIterator::Seek(const Slice& internal_key) { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(); + } else if (status_.IsIncomplete()) { + ResetIncompleteIterators(); } SeekInternal(internal_key, false); } @@ -211,7 +222,15 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, } l0_iters_[i]->Seek(internal_key); } - if (l0_iters_[i]->Valid()) { + + if (l0_iters_[i]->status().IsIncomplete()) { + // if any of the immutable iterators is incomplete (no-io option was + // used), we are unable to reliably find the smallest key + assert(read_options_.read_tier == kBlockCacheTier); + status_ = l0_iters_[i]->status(); + valid_ = false; + return; + } else if (l0_iters_[i]->Valid()) { immutable_min_heap_.push(l0_iters_[i]); } } @@ -280,7 +299,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, level_iters_[level - 1]->SetFileIndex(f_idx); seek_to_first ? level_iters_[level - 1]->SeekToFirst() : level_iters_[level - 1]->Seek(internal_key); - if (level_iters_[level - 1]->Valid()) { + + if (level_iters_[level - 1]->status().IsIncomplete()) { + // see above + assert(read_options_.read_tier == kBlockCacheTier); + status_ = level_iters_[level - 1]->status(); + valid_ = false; + return; + } else if (level_iters_[level - 1]->Valid()) { immutable_min_heap_.push(level_iters_[level - 1]); } } @@ -304,7 +330,7 @@ void ForwardIterator::Next() { assert(valid_); if (sv_ == nullptr || - sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + sv_->version_number != cfd_->GetSuperVersionNumber()) { std::string current_key = key().ToString(); Slice old_key(current_key.data(), current_key.size()); @@ -320,9 +346,17 @@ void ForwardIterator::Next() { } current_->Next(); - if (current_->Valid() && current_ != mutable_iter_) { - immutable_min_heap_.push(current_); + if (current_ != mutable_iter_) { + if (current_->status().IsIncomplete()) { + assert(read_options_.read_tier == kBlockCacheTier); + status_ = current_->status(); + valid_ = false; + return; + } else if (current_->Valid()) { + immutable_min_heap_.push(current_); + } } + UpdateCurrent(); } @@ -389,6 +423,29 @@ void ForwardIterator::RebuildIterators() { is_prev_set_ = false; } +void ForwardIterator::ResetIncompleteIterators() { + const auto& l0_files = sv_->current->files_[0]; + for (uint32_t i = 0; i < l0_iters_.size(); ++i) { + assert(i < l0_files.size()); + if (!l0_iters_[i]->status().IsIncomplete()) { + continue; + } + delete l0_iters_[i]; + l0_iters_[i] = cfd_->table_cache()->NewIterator( + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), + l0_files[i]->fd); + } + + for (auto* level_iter : level_iters_) { + if (level_iter && level_iter->status().IsIncomplete()) { + level_iter->Reset(); + } + } + + current_ = nullptr; + is_prev_set_ = false; +} + void ForwardIterator::UpdateCurrent() { if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) { current_ = nullptr; @@ -417,7 +474,7 @@ void ForwardIterator::UpdateCurrent() { } bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { - if (!is_prev_set_) { + if (!valid_ || !is_prev_set_) { return true; } Slice prev_key = prev_key_.GetKey(); diff --git a/db/forward_iterator.h b/db/forward_iterator.h index d539ae3c7..bbf423a50 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -73,6 +73,7 @@ class ForwardIterator : public Iterator { private: void Cleanup(); void RebuildIterators(); + void ResetIncompleteIterators(); void SeekInternal(const Slice& internal_key, bool seek_to_first); void UpdateCurrent(); bool NeedToSeekImmutable(const Slice& internal_key); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 34eb99781..3142d13b3 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -257,9 +257,11 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, cfd_->imm()->current()->GetTotalNumEntries() + current->GetEstimatedActiveKeys(); return true; +#ifndef ROCKSDB_LITE case kIsFileDeletionEnabled: *value = db->IsFileDeletionsEnabled(); return true; +#endif default: return false; } diff --git a/db/memtable.cc b/db/memtable.cc index 523998c30..e9e7051c7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -422,7 +422,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // Avoiding recording stats for speed. return false; } - PERF_TIMER_AUTO(get_from_memtable_time); + PERF_TIMER_GUARD(get_from_memtable_time); Slice user_key = key.user_key(); bool found_final_value = false; @@ -452,7 +452,6 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, if (!found_final_value && merge_in_progress) { *s = Status::MergeInProgress(""); } - PERF_TIMER_STOP(get_from_memtable_time); PERF_COUNTER_ADD(get_from_memtable_count, 1); return found_final_value; } diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc index 3a5809774..e88485070 100644 --- a/db/simple_table_db_test.cc +++ b/db/simple_table_db_test.cc @@ -556,7 +556,7 @@ public: WritableFile* file, CompressionType compression_type) const; - virtual Status SanitizeDBOptions(DBOptions* db_opts) const override { + virtual Status SanitizeDBOptions(const DBOptions* db_opts) const override { return Status::OK(); } diff --git a/db/write_batch.cc b/db/write_batch.cc index fdc0e2c6e..bfa5e3f6f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -299,17 +299,17 @@ class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; ColumnFamilyMemTables* cf_mems_; - bool recovery_; + bool ignore_missing_column_families_; uint64_t log_number_; DBImpl* db_; const bool dont_filter_deletes_; MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, - bool recovery, uint64_t log_number, DB* db, - const bool dont_filter_deletes) + bool ignore_missing_column_families, uint64_t log_number, + DB* db, const bool dont_filter_deletes) : sequence_(sequence), cf_mems_(cf_mems), - recovery_(recovery), + ignore_missing_column_families_(ignore_missing_column_families), log_number_(log_number), db_(reinterpret_cast(db)), dont_filter_deletes_(dont_filter_deletes) { @@ -321,12 +321,18 @@ class MemTableInserter : public WriteBatch::Handler { bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { bool found = cf_mems_->Seek(column_family_id); - if (recovery_ && (!found || log_number_ < cf_mems_->GetLogNumber())) { - // if in recovery envoronment: - // * If column family was not found, it might mean that the WAL write - // batch references to the column family that was dropped after the - // insert. We don't want to fail the whole write batch in that case -- we - // just ignore the update. + if (!found) { + if (ignore_missing_column_families_) { + *s = Status::OK(); + } else { + *s = Status::InvalidArgument( + "Invalid column family specified in write batch"); + } + return false; + } + if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) { + // This is true only in recovery environment (log_number_ is always 0 in + // non-recovery, regular write code-path) // * If log_number_ < cf_mems_->GetLogNumber(), this means that column // family already contains updates from this log. We can't apply updates // twice because of update-in-place or merge workloads -- ignore the @@ -334,18 +340,8 @@ class MemTableInserter : public WriteBatch::Handler { *s = Status::OK(); return false; } - if (!found) { - assert(!recovery_); - // If the column family was not found in non-recovery enviornment - // (client's write code-path), we have to fail the write and return - // the failure status to the client. - *s = Status::InvalidArgument( - "Invalid column family specified in write batch"); - return false; - } return true; } - virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { Status seek_status; @@ -503,10 +499,12 @@ class MemTableInserter : public WriteBatch::Handler { Status WriteBatchInternal::InsertInto(const WriteBatch* b, ColumnFamilyMemTables* memtables, - bool recovery, uint64_t log_number, - DB* db, const bool dont_filter_deletes) { + bool ignore_missing_column_families, + uint64_t log_number, DB* db, + const bool dont_filter_deletes) { MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, - recovery, log_number, db, dont_filter_deletes); + ignore_missing_column_families, log_number, db, + dont_filter_deletes); return b->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 9a191f4cb..615a47f5e 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -106,18 +106,18 @@ class WriteBatchInternal { // Inserts batch entries into memtable // If dont_filter_deletes is false AND options.filter_deletes is true, // then --> Drops deletes in batch if db->KeyMayExist returns false - // If recovery == true, this means InsertInto is executed on a recovery - // code-path. WriteBatch referencing a dropped column family can be - // found on a recovery code-path and should be ignored (recovery should not - // fail). Additionally, the memtable will be updated only if + // If ignore_missing_column_families == true. WriteBatch referencing + // non-existing column family should be ignored. + // However, if ignore_missing_column_families == false, any WriteBatch + // referencing non-existing column family will return a InvalidArgument() + // failure. + // + // If log_number is non-zero, the memtable will be updated only if // memtables->GetLogNumber() >= log_number - // However, if recovery == false, any WriteBatch referencing - // non-existing column family will return a failure. Also, log_number is - // ignored in that case static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, - bool recovery = false, uint64_t log_number = 0, - DB* db = nullptr, + bool ignore_missing_column_families = false, + uint64_t log_number = 0, DB* db = nullptr, const bool dont_filter_deletes = true); static void Append(WriteBatch* dst, const WriteBatch* src); diff --git a/include/rocksdb/iostats_context.h b/include/rocksdb/iostats_context.h index 0a220b53a..e06ee1773 100644 --- a/include/rocksdb/iostats_context.h +++ b/include/rocksdb/iostats_context.h @@ -27,7 +27,9 @@ struct IOStatsContext { uint64_t bytes_read; }; +#ifndef IOS_CROSS_COMPILE extern __thread IOStatsContext iostats_context; +#endif // IOS_CROSS_COMPILE } // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 3569409c4..0ca303344 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -959,7 +959,17 @@ struct WriteOptions { // Default: 0 uint64_t timeout_hint_us; - WriteOptions() : sync(false), disableWAL(false), timeout_hint_us(0) {} + // If true and if user is trying to write to column families that don't exist + // (they were dropped), ignore the write (don't return an error). If there + // are multiple writes in a WriteBatch, other writes will succeed. + // Default: false + bool ignore_missing_column_families; + + WriteOptions() + : sync(false), + disableWAL(false), + timeout_hint_us(0), + ignore_missing_column_families(false) {} }; // Options that control flush operations diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index b20689a77..d13ff9d81 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -96,7 +96,7 @@ class Status { // Returns true iff the status indicates Incomplete bool IsIncomplete() const { return code() == kIncomplete; } - // Returns true iff the status indicates Incomplete + // Returns true iff the status indicates Shutdown In progress bool IsShutdownInProgress() const { return code() == kShutdownInProgress; } bool IsTimedOut() const { return code() == kTimedOut; } diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 66556e7ed..0f8b41074 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -227,15 +227,46 @@ extern TableFactory* NewPlainTableFactory(const PlainTableOptions& options = PlainTableOptions()); struct CuckooTablePropertyNames { + // The key that is used to fill empty buckets. static const std::string kEmptyKey; + // Fixed length of value. static const std::string kValueLength; - static const std::string kNumHashTable; - static const std::string kMaxNumBuckets; + // Number of hash functions used in Cuckoo Hash. + static const std::string kNumHashFunc; + // It denotes the number of buckets in a Cuckoo Block. Given a key and a + // particular hash function, a Cuckoo Block is a set of consecutive buckets, + // where starting bucket id is given by the hash function on the key. In case + // of a collision during inserting the key, the builder tries to insert the + // key in other locations of the cuckoo block before using the next hash + // function. This reduces cache miss during read operation in case of + // collision. + static const std::string kCuckooBlockSize; + // Size of the hash table. Use this number to compute the modulo of hash + // function. The actual number of buckets will be kMaxHashTableSize + + // kCuckooBlockSize - 1. The last kCuckooBlockSize-1 buckets are used to + // accommodate the Cuckoo Block from end of hash table, due to cache friendly + // implementation. + static const std::string kHashTableSize; + // Denotes if the key sorted in the file is Internal Key (if false) + // or User Key only (if true). static const std::string kIsLastLevel; }; +// Cuckoo Table Factory for SST table format using Cache Friendly Cuckoo Hashing +// @hash_table_ratio: Determines the utilization of hash tables. Smaller values +// result in larger hash tables with fewer collisions. +// @max_search_depth: A property used by builder to determine the depth to go to +// to search for a path to displace elements in case of +// collision. See Builder.MakeSpaceForKey method. Higher +// values result in more efficient hash tables with fewer +// lookups but take more time to build. +// @cuckoo_block_size: In case of collision while inserting, the builder +// attempts to insert in the next cuckoo_block_size +// locations before skipping over to the next Cuckoo hash +// function. This makes lookups more cache friendly in case +// of collisions. extern TableFactory* NewCuckooTableFactory(double hash_table_ratio = 0.9, - uint32_t max_search_depth = 100); + uint32_t max_search_depth = 100, uint32_t cuckoo_block_size = 5); #endif // ROCKSDB_LITE @@ -300,7 +331,7 @@ class TableFactory { // // If the function cannot find a way to sanitize the input DB Options, // a non-ok Status will be returned. - virtual Status SanitizeDBOptions(DBOptions* db_opts) const = 0; + virtual Status SanitizeDBOptions(const DBOptions* db_opts) const = 0; // Return a string that contains printable format of table configurations. // RocksDB prints configurations at DB Open(). diff --git a/table/adaptive_table_factory.h b/table/adaptive_table_factory.h index 571e07498..f119d97b1 100644 --- a/table/adaptive_table_factory.h +++ b/table/adaptive_table_factory.h @@ -43,7 +43,7 @@ class AdaptiveTableFactory : public TableFactory { override; // Sanitizes the specified DB Options. - Status SanitizeDBOptions(DBOptions* db_opts) const override { + Status SanitizeDBOptions(const DBOptions* db_opts) const override { if (db_opts->allow_mmap_reads == false) { return Status::NotSupported( "AdaptiveTable with allow_mmap_reads == false is not supported."); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index ecb176a97..5d0fc9988 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -116,7 +116,7 @@ class ShortenedIndexBuilder : public IndexBuilder { public: explicit ShortenedIndexBuilder(const Comparator* comparator) : IndexBuilder(comparator), - index_block_builder_(1 /* block_restart_interval == 1 */, comparator) {} + index_block_builder_(1 /* block_restart_interval == 1 */) {} virtual void AddIndexEntry(std::string* last_key_in_current_block, const Slice* first_key_in_next_block, @@ -420,7 +420,7 @@ struct BlockBasedTableBuilder::Rep { table_options(table_opt), internal_comparator(icomparator), file(f), - data_block(table_options.block_restart_interval, &internal_comparator), + data_block(table_options.block_restart_interval), internal_prefix_transform(options.prefix_extractor.get()), index_builder(CreateIndexBuilder( table_options.index_type, &internal_comparator, @@ -492,7 +492,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { } if (r->filter_block != nullptr) { - r->filter_block->AddKey(key); + r->filter_block->AddKey(ExtractUserKey(key)); } r->last_key.assign(key.data(), key.size()); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 0f7863e8d..de30fb383 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -38,10 +38,6 @@ BlockBasedTableFactory::BlockBasedTableFactory( table_options_.block_size_deviation > 100) { table_options_.block_size_deviation = 0; } - if (table_options_.filter_policy) { - auto* p = new InternalFilterPolicy(table_options_.filter_policy); - table_options_.filter_policy.reset(p); - } } Status BlockBasedTableFactory::NewTableReader( diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index 90282bf9d..d7045346a 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -45,7 +45,7 @@ class BlockBasedTableFactory : public TableFactory { WritableFile* file, CompressionType compression_type) const override; // Sanitizes the specified DB Options. - Status SanitizeDBOptions(DBOptions* db_opts) const override { + Status SanitizeDBOptions(const DBOptions* db_opts) const override { return Status::OK(); } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 3c0ef527e..0be38a1dc 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1067,9 +1067,8 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { s = handle.DecodeFrom(&handle_value); assert(s.ok()); auto filter_entry = GetFilter(true /* no io */); - may_match = - filter_entry.value == nullptr || - filter_entry.value->PrefixMayMatch(handle.offset(), internal_prefix); + may_match = filter_entry.value == nullptr || + filter_entry.value->PrefixMayMatch(handle.offset(), prefix); filter_entry.Release(rep_->table_options.block_cache.get()); } @@ -1105,9 +1104,8 @@ Status BlockBasedTable::Get( BlockHandle handle; bool may_not_exist_in_filter = - filter != nullptr && - handle.DecodeFrom(&handle_value).ok() && - !filter->KeyMayMatch(handle.offset(), key); + filter != nullptr && handle.DecodeFrom(&handle_value).ok() && + !filter->KeyMayMatch(handle.offset(), ExtractUserKey(key)); if (may_not_exist_in_filter) { // Not found diff --git a/table/block_builder.cc b/table/block_builder.cc index 5bac54ae7..f8627743a 100644 --- a/table/block_builder.cc +++ b/table/block_builder.cc @@ -41,10 +41,8 @@ namespace rocksdb { -BlockBuilder::BlockBuilder(int block_restart_interval, - const Comparator* comparator) +BlockBuilder::BlockBuilder(int block_restart_interval) : block_restart_interval_(block_restart_interval), - comparator_(comparator), restarts_(), counter_(0), finished_(false) { @@ -96,8 +94,6 @@ void BlockBuilder::Add(const Slice& key, const Slice& value) { Slice last_key_piece(last_key_); assert(!finished_); assert(counter_ <= block_restart_interval_); - assert(buffer_.empty() // No values yet? - || comparator_->Compare(key, last_key_piece) > 0); size_t shared = 0; if (counter_ < block_restart_interval_) { // See how much sharing to do with previous string diff --git a/table/block_builder.h b/table/block_builder.h index eb7c49f7d..3b5b2b444 100644 --- a/table/block_builder.h +++ b/table/block_builder.h @@ -22,7 +22,7 @@ class BlockBuilder { BlockBuilder(const BlockBuilder&) = delete; void operator=(const BlockBuilder&) = delete; - BlockBuilder(int block_restart_interval, const Comparator* comparator); + explicit BlockBuilder(int block_restart_interval); // Reset the contents as if the BlockBuilder was just constructed. void Reset(); @@ -50,7 +50,6 @@ class BlockBuilder { private: const int block_restart_interval_; - const Comparator* comparator_; std::string buffer_; // Destination buffer std::vector restarts_; // Restart points diff --git a/table/block_test.cc b/table/block_test.cc index da01d6def..b36787f8f 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -76,7 +76,7 @@ TEST(BlockTest, SimpleTest) { std::vector keys; std::vector values; - BlockBuilder builder(16, ic.get()); + BlockBuilder builder(16); int num_records = 100000; GenerateRandomKVs(&keys, &values, 0, num_records); @@ -132,8 +132,7 @@ BlockContents GetBlockContents(std::unique_ptr *builder, const std::vector &keys, const std::vector &values, const int prefix_group_size = 1) { - builder->reset( - new BlockBuilder(1 /* restart interval */, BytewiseComparator())); + builder->reset(new BlockBuilder(1 /* restart interval */)); // Add only half of the keys for (size_t i = 0; i < keys.size(); ++i) { diff --git a/table/cuckoo_table_builder.cc b/table/cuckoo_table_builder.cc index 9e02bb04e..6326d3787 100644 --- a/table/cuckoo_table_builder.cc +++ b/table/cuckoo_table_builder.cc @@ -16,6 +16,7 @@ #include "rocksdb/env.h" #include "rocksdb/table.h" #include "table/block_builder.h" +#include "table/cuckoo_table_factory.h" #include "table/format.h" #include "table/meta_blocks.h" #include "util/autovector.h" @@ -24,28 +25,32 @@ namespace rocksdb { const std::string CuckooTablePropertyNames::kEmptyKey = "rocksdb.cuckoo.bucket.empty.key"; -const std::string CuckooTablePropertyNames::kNumHashTable = +const std::string CuckooTablePropertyNames::kNumHashFunc = "rocksdb.cuckoo.hash.num"; -const std::string CuckooTablePropertyNames::kMaxNumBuckets = - "rocksdb.cuckoo.bucket.maxnum"; +const std::string CuckooTablePropertyNames::kHashTableSize = + "rocksdb.cuckoo.hash.size"; const std::string CuckooTablePropertyNames::kValueLength = "rocksdb.cuckoo.value.length"; const std::string CuckooTablePropertyNames::kIsLastLevel = "rocksdb.cuckoo.file.islastlevel"; +const std::string CuckooTablePropertyNames::kCuckooBlockSize = + "rocksdb.cuckoo.hash.cuckooblocksize"; // Obtained by running echo rocksdb.table.cuckoo | sha1sum extern const uint64_t kCuckooTableMagicNumber = 0x926789d0c5f17873ull; CuckooTableBuilder::CuckooTableBuilder( - WritableFile* file, double hash_table_ratio, + WritableFile* file, double max_hash_table_ratio, uint32_t max_num_hash_table, uint32_t max_search_depth, - const Comparator* user_comparator, + const Comparator* user_comparator, uint32_t cuckoo_block_size, uint64_t (*get_slice_hash)(const Slice&, uint32_t, uint64_t)) - : num_hash_table_(2), + : num_hash_func_(2), file_(file), - hash_table_ratio_(hash_table_ratio), - max_num_hash_table_(max_num_hash_table), + max_hash_table_ratio_(max_hash_table_ratio), + max_num_hash_func_(max_num_hash_table), max_search_depth_(max_search_depth), + cuckoo_block_size_(std::max(1U, cuckoo_block_size)), + hash_table_size_(2), is_last_level_file_(false), has_seen_first_key_(false), ucomp_(user_comparator), @@ -86,7 +91,6 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) { } else { kvs_.emplace_back(std::make_pair(key.ToString(), value.ToString())); } - properties_.num_entries++; // In order to fill the empty buckets in the hash table, we identify a // key which is not used so far (unused_user_key). We determine this by @@ -98,11 +102,14 @@ void CuckooTableBuilder::Add(const Slice& key, const Slice& value) { } else if (ikey.user_key.compare(largest_user_key_) > 0) { largest_user_key_.assign(ikey.user_key.data(), ikey.user_key.size()); } + if (hash_table_size_ < kvs_.size() / max_hash_table_ratio_) { + hash_table_size_ *= 2; + } } Status CuckooTableBuilder::MakeHashTable(std::vector* buckets) { - uint64_t num_buckets = kvs_.size() / hash_table_ratio_; - buckets->resize(num_buckets); + uint64_t hash_table_size_minus_one = hash_table_size_ - 1; + buckets->resize(hash_table_size_minus_one + cuckoo_block_size_); uint64_t make_space_for_key_call_id = 0; for (uint32_t vector_idx = 0; vector_idx < kvs_.size(); vector_idx++) { uint64_t bucket_id; @@ -110,39 +117,50 @@ Status CuckooTableBuilder::MakeHashTable(std::vector* buckets) { autovector hash_vals; Slice user_key = is_last_level_file_ ? kvs_[vector_idx].first : ExtractUserKey(kvs_[vector_idx].first); - for (uint32_t hash_cnt = 0; hash_cnt < num_hash_table_; ++hash_cnt) { - uint64_t hash_val = get_slice_hash_(user_key, hash_cnt, num_buckets); - if ((*buckets)[hash_val].vector_idx == kMaxVectorIdx) { - bucket_id = hash_val; - bucket_found = true; - break; - } else { - if (ucomp_->Compare(user_key, is_last_level_file_ - ? Slice(kvs_[(*buckets)[hash_val].vector_idx].first) - : ExtractUserKey( - kvs_[(*buckets)[hash_val].vector_idx].first)) == 0) { - return Status::NotSupported("Same key is being inserted again."); + for (uint32_t hash_cnt = 0; hash_cnt < num_hash_func_ && !bucket_found; + ++hash_cnt) { + uint64_t hash_val = CuckooHash(user_key, hash_cnt, + hash_table_size_minus_one, get_slice_hash_); + // If there is a collision, check next cuckoo_block_size_ locations for + // empty locations. While checking, if we reach end of the hash table, + // stop searching and proceed for next hash function. + for (uint32_t block_idx = 0; block_idx < cuckoo_block_size_; + ++block_idx, ++hash_val) { + if ((*buckets)[hash_val].vector_idx == kMaxVectorIdx) { + bucket_id = hash_val; + bucket_found = true; + break; + } else { + if (ucomp_->Compare(user_key, is_last_level_file_ + ? Slice(kvs_[(*buckets)[hash_val].vector_idx].first) + : ExtractUserKey( + kvs_[(*buckets)[hash_val].vector_idx].first)) == 0) { + return Status::NotSupported("Same key is being inserted again."); + } + hash_vals.push_back(hash_val); } - hash_vals.push_back(hash_val); } } while (!bucket_found && !MakeSpaceForKey(hash_vals, ++make_space_for_key_call_id, buckets, &bucket_id)) { // Rehash by increashing number of hash tables. - if (num_hash_table_ >= max_num_hash_table_) { - return Status::NotSupported("Too many collissions. Unable to hash."); + if (num_hash_func_ >= max_num_hash_func_) { + return Status::NotSupported("Too many collisions. Unable to hash."); } // We don't really need to rehash the entire table because old hashes are // still valid and we only increased the number of hash functions. - uint64_t hash_val = get_slice_hash_(user_key, - num_hash_table_, num_buckets); - ++num_hash_table_; - if ((*buckets)[hash_val].vector_idx == kMaxVectorIdx) { - bucket_found = true; - bucket_id = hash_val; - break; - } else { - hash_vals.push_back(hash_val); + uint64_t hash_val = CuckooHash(user_key, num_hash_func_, + hash_table_size_minus_one, get_slice_hash_); + ++num_hash_func_; + for (uint32_t block_idx = 0; block_idx < cuckoo_block_size_; + ++block_idx, ++hash_val) { + if ((*buckets)[hash_val].vector_idx == kMaxVectorIdx) { + bucket_found = true; + bucket_id = hash_val; + break; + } else { + hash_vals.push_back(hash_val); + } } } (*buckets)[bucket_id].vector_idx = vector_idx; @@ -154,13 +172,14 @@ Status CuckooTableBuilder::Finish() { assert(!closed_); closed_ = true; std::vector buckets; - Status s = MakeHashTable(&buckets); - if (!s.ok()) { - return s; - } - // Determine unused_user_key to fill empty buckets. + Status s; std::string unused_bucket; if (!kvs_.empty()) { + s = MakeHashTable(&buckets); + if (!s.ok()) { + return s; + } + // Determine unused_user_key to fill empty buckets. std::string unused_user_key = smallest_user_key_; int curr_pos = unused_user_key.size() - 1; while (curr_pos >= 0) { @@ -192,6 +211,7 @@ Status CuckooTableBuilder::Finish() { AppendInternalKey(&unused_bucket, ikey); } } + properties_.num_entries = kvs_.size(); properties_.fixed_key_len = unused_bucket.size(); uint32_t value_length = kvs_.empty() ? 0 : kvs_[0].second.size(); uint32_t bucket_size = value_length + properties_.fixed_key_len; @@ -226,16 +246,22 @@ Status CuckooTableBuilder::Finish() { properties_.user_collected_properties[ CuckooTablePropertyNames::kEmptyKey] = unused_bucket; properties_.user_collected_properties[ - CuckooTablePropertyNames::kNumHashTable].assign( - reinterpret_cast(&num_hash_table_), sizeof(num_hash_table_)); - uint64_t num_buckets = buckets.size(); + CuckooTablePropertyNames::kNumHashFunc].assign( + reinterpret_cast(&num_hash_func_), sizeof(num_hash_func_)); + + uint64_t hash_table_size = buckets.size() - cuckoo_block_size_ + 1; properties_.user_collected_properties[ - CuckooTablePropertyNames::kMaxNumBuckets].assign( - reinterpret_cast(&num_buckets), sizeof(num_buckets)); + CuckooTablePropertyNames::kHashTableSize].assign( + reinterpret_cast(&hash_table_size), + sizeof(hash_table_size)); properties_.user_collected_properties[ CuckooTablePropertyNames::kIsLastLevel].assign( reinterpret_cast(&is_last_level_file_), sizeof(is_last_level_file_)); + properties_.user_collected_properties[ + CuckooTablePropertyNames::kCuckooBlockSize].assign( + reinterpret_cast(&cuckoo_block_size_), + sizeof(cuckoo_block_size_)); // Write meta blocks. MetaIndexBuilder meta_index_builder; @@ -279,7 +305,7 @@ void CuckooTableBuilder::Abandon() { } uint64_t CuckooTableBuilder::NumEntries() const { - return properties_.num_entries; + return kvs_.size(); } uint64_t CuckooTableBuilder::FileSize() const { @@ -288,11 +314,17 @@ uint64_t CuckooTableBuilder::FileSize() const { } else if (properties_.num_entries == 0) { return 0; } - // This is not the actual size of the file as we need to account for - // hash table ratio. This returns the size of filled buckets in the table - // scaled up by a factor of 1/hash_table_ratio. - return ((kvs_[0].first.size() + kvs_[0].second.size()) * - properties_.num_entries) / hash_table_ratio_; + + // Account for buckets being a power of two. + // As elements are added, file size remains constant for a while and doubles + // its size. Since compaction algorithm stops adding elements only after it + // exceeds the file limit, we account for the extra element being added here. + uint64_t expected_hash_table_size = hash_table_size_; + if (expected_hash_table_size < (kvs_.size() + 1) / max_hash_table_ratio_) { + expected_hash_table_size *= 2; + } + return (kvs_[0].first.size() + kvs_[0].second.size()) * + expected_hash_table_size; } // This method is invoked when there is no place to insert the target key. @@ -322,17 +354,19 @@ bool CuckooTableBuilder::MakeSpaceForKey( std::vector tree; // We want to identify already visited buckets in the current method call so // that we don't add same buckets again for exploration in the tree. - // We do this by maintaining a count of current method call, which acts as a - // unique id for this invocation of the method. We store this number into - // the nodes that we explore in current method call. + // We do this by maintaining a count of current method call in + // make_space_for_key_call_id, which acts as a unique id for this invocation + // of the method. We store this number into the nodes that we explore in + // current method call. // It is unlikely for the increment operation to overflow because the maximum - // no. of times this will be called is <= max_num_hash_table_ + kvs_.size(). - for (uint32_t hash_cnt = 0; hash_cnt < num_hash_table_; ++hash_cnt) { + // no. of times this will be called is <= max_num_hash_func_ + kvs_.size(). + for (uint32_t hash_cnt = 0; hash_cnt < num_hash_func_; ++hash_cnt) { uint64_t bucket_id = hash_vals[hash_cnt]; (*buckets)[bucket_id].make_space_for_key_call_id = make_space_for_key_call_id; tree.push_back(CuckooNode(bucket_id, 0, 0)); } + uint64_t hash_table_size_minus_one = hash_table_size_ - 1; bool null_found = false; uint32_t curr_pos = 0; while (!null_found && curr_pos < tree.size()) { @@ -342,22 +376,27 @@ bool CuckooTableBuilder::MakeSpaceForKey( break; } CuckooBucket& curr_bucket = (*buckets)[curr_node.bucket_id]; - for (uint32_t hash_cnt = 0; hash_cnt < num_hash_table_; ++hash_cnt) { - uint64_t child_bucket_id = get_slice_hash_( - is_last_level_file_ ? kvs_[curr_bucket.vector_idx].first - : ExtractUserKey(Slice(kvs_[curr_bucket.vector_idx].first)), - hash_cnt, buckets->size()); - if ((*buckets)[child_bucket_id].make_space_for_key_call_id == - make_space_for_key_call_id) { - continue; - } - (*buckets)[child_bucket_id].make_space_for_key_call_id = - make_space_for_key_call_id; - tree.push_back(CuckooNode(child_bucket_id, curr_depth + 1, - curr_pos)); - if ((*buckets)[child_bucket_id].vector_idx == kMaxVectorIdx) { - null_found = true; - break; + for (uint32_t hash_cnt = 0; + hash_cnt < num_hash_func_ && !null_found; ++hash_cnt) { + uint64_t child_bucket_id = CuckooHash( + (is_last_level_file_ ? kvs_[curr_bucket.vector_idx].first : + ExtractUserKey(Slice(kvs_[curr_bucket.vector_idx].first))), + hash_cnt, hash_table_size_minus_one, get_slice_hash_); + // Iterate inside Cuckoo Block. + for (uint32_t block_idx = 0; block_idx < cuckoo_block_size_; + ++block_idx, ++child_bucket_id) { + if ((*buckets)[child_bucket_id].make_space_for_key_call_id == + make_space_for_key_call_id) { + continue; + } + (*buckets)[child_bucket_id].make_space_for_key_call_id = + make_space_for_key_call_id; + tree.push_back(CuckooNode(child_bucket_id, curr_depth + 1, + curr_pos)); + if ((*buckets)[child_bucket_id].vector_idx == kMaxVectorIdx) { + null_found = true; + break; + } } } ++curr_pos; @@ -367,10 +406,10 @@ bool CuckooTableBuilder::MakeSpaceForKey( // There is an empty node in tree.back(). Now, traverse the path from this // empty node to top of the tree and at every node in the path, replace // child with the parent. Stop when first level is reached in the tree - // (happens when 0 <= bucket_to_replace_pos < num_hash_table_) and return + // (happens when 0 <= bucket_to_replace_pos < num_hash_func_) and return // this location in first level for target key to be inserted. uint32_t bucket_to_replace_pos = tree.size()-1; - while (bucket_to_replace_pos >= num_hash_table_) { + while (bucket_to_replace_pos >= num_hash_func_) { CuckooNode& curr_node = tree[bucket_to_replace_pos]; (*buckets)[curr_node.bucket_id] = (*buckets)[tree[curr_node.parent_pos].bucket_id]; diff --git a/table/cuckoo_table_builder.h b/table/cuckoo_table_builder.h index 92f5c9cee..2bf206102 100644 --- a/table/cuckoo_table_builder.h +++ b/table/cuckoo_table_builder.h @@ -21,8 +21,9 @@ namespace rocksdb { class CuckooTableBuilder: public TableBuilder { public: CuckooTableBuilder( - WritableFile* file, double hash_table_ratio, uint32_t max_num_hash_table, - uint32_t max_search_depth, const Comparator* user_comparator, + WritableFile* file, double max_hash_table_ratio, + uint32_t max_num_hash_func, uint32_t max_search_depth, + const Comparator* user_comparator, uint32_t cuckoo_block_size, uint64_t (*get_slice_hash)(const Slice&, uint32_t, uint64_t)); // REQUIRES: Either Finish() or Abandon() has been called. @@ -60,7 +61,7 @@ class CuckooTableBuilder: public TableBuilder { CuckooBucket() : vector_idx(kMaxVectorIdx), make_space_for_key_call_id(0) {} uint32_t vector_idx; - // This number will not exceed kvs_.size() + max_num_hash_table_. + // This number will not exceed kvs_.size() + max_num_hash_func_. // We assume number of items is <= 2^32. uint32_t make_space_for_key_call_id; }; @@ -73,11 +74,13 @@ class CuckooTableBuilder: public TableBuilder { uint64_t* bucket_id); Status MakeHashTable(std::vector* buckets); - uint32_t num_hash_table_; + uint32_t num_hash_func_; WritableFile* file_; - const double hash_table_ratio_; - const uint32_t max_num_hash_table_; + const double max_hash_table_ratio_; + const uint32_t max_num_hash_func_; const uint32_t max_search_depth_; + const uint32_t cuckoo_block_size_; + uint64_t hash_table_size_; bool is_last_level_file_; Status status_; std::vector> kvs_; diff --git a/table/cuckoo_table_builder_test.cc b/table/cuckoo_table_builder_test.cc index 047f35ce1..69647d410 100644 --- a/table/cuckoo_table_builder_test.cc +++ b/table/cuckoo_table_builder_test.cc @@ -37,8 +37,9 @@ class CuckooBuilderTest { void CheckFileContents(const std::vector& keys, const std::vector& values, const std::vector& expected_locations, - std::string expected_unused_bucket, uint64_t expected_max_buckets, - uint32_t expected_num_hash_fun, bool expected_is_last_level) { + std::string expected_unused_bucket, uint64_t expected_table_size, + uint32_t expected_num_hash_func, bool expected_is_last_level, + uint32_t expected_cuckoo_block_size = 1) { // Read file unique_ptr read_file; ASSERT_OK(env_->NewRandomAccessFile(fname, &read_file, env_options_)); @@ -51,7 +52,8 @@ class CuckooBuilderTest { kCuckooTableMagicNumber, env_, nullptr, &props)); ASSERT_EQ(props->num_entries, keys.size()); ASSERT_EQ(props->fixed_key_len, keys.empty() ? 0 : keys[0].size()); - ASSERT_EQ(props->data_size, keys.size()*expected_unused_bucket.size()); + ASSERT_EQ(props->data_size, expected_unused_bucket.size() * + (expected_table_size + expected_cuckoo_block_size - 1)); ASSERT_EQ(props->raw_key_size, keys.size()*props->fixed_key_len); // Check unused bucket. @@ -65,14 +67,18 @@ class CuckooBuilderTest { CuckooTablePropertyNames::kValueLength].data()); ASSERT_EQ(values.empty() ? 0 : values[0].size(), value_len_found); ASSERT_EQ(props->raw_value_size, values.size()*value_len_found); - const uint64_t max_buckets = + const uint64_t table_size = *reinterpret_cast(props->user_collected_properties[ - CuckooTablePropertyNames::kMaxNumBuckets].data()); - ASSERT_EQ(expected_max_buckets, max_buckets); - const uint32_t num_hash_fun_found = + CuckooTablePropertyNames::kHashTableSize].data()); + ASSERT_EQ(expected_table_size, table_size); + const uint32_t num_hash_func_found = *reinterpret_cast(props->user_collected_properties[ - CuckooTablePropertyNames::kNumHashTable].data()); - ASSERT_EQ(expected_num_hash_fun, num_hash_fun_found); + CuckooTablePropertyNames::kNumHashFunc].data()); + ASSERT_EQ(expected_num_hash_func, num_hash_func_found); + const uint32_t cuckoo_block_size = + *reinterpret_cast(props->user_collected_properties[ + CuckooTablePropertyNames::kCuckooBlockSize].data()); + ASSERT_EQ(expected_cuckoo_block_size, cuckoo_block_size); const bool is_last_level_found = *reinterpret_cast(props->user_collected_properties[ CuckooTablePropertyNames::kIsLastLevel].data()); @@ -82,7 +88,7 @@ class CuckooBuilderTest { // Check contents of the bucket. std::vector keys_found(keys.size(), false); uint32_t bucket_size = expected_unused_bucket.size(); - for (uint32_t i = 0; i < max_buckets; ++i) { + for (uint32_t i = 0; i < table_size + cuckoo_block_size - 1; ++i) { Slice read_slice; ASSERT_OK(read_file->Read(i*bucket_size, bucket_size, &read_slice, nullptr)); @@ -108,6 +114,14 @@ class CuckooBuilderTest { return ikey.GetKey().ToString(); } + uint64_t NextPowOf2(uint64_t num) { + uint64_t n = 2; + while (n <= num) { + n *= 2; + } + return n; + } + Env* env_; EnvOptions env_options_; std::string fname; @@ -116,10 +130,10 @@ class CuckooBuilderTest { TEST(CuckooBuilderTest, SuccessWithEmptyFile) { unique_ptr writable_file; - fname = test::TmpDir() + "/NoCollisionFullKey"; + fname = test::TmpDir() + "/EmptyFile"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - 4, 100, BytewiseComparator(), GetSliceHash); + 4, 100, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); @@ -146,7 +160,7 @@ TEST(CuckooBuilderTest, WriteSuccessNoCollisionFullKey) { fname = test::TmpDir() + "/NoCollisionFullKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 100, BytewiseComparator(), GetSliceHash); + num_hash_fun, 100, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(keys[i]), Slice(values[i])); @@ -156,11 +170,11 @@ TEST(CuckooBuilderTest, WriteSuccessNoCollisionFullKey) { ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); - uint32_t expected_max_buckets = keys.size() / kHashTableRatio; + uint32_t expected_table_size = NextPowOf2(keys.size() / kHashTableRatio); std::string expected_unused_bucket = GetInternalKey("key00", true); expected_unused_bucket += std::string(values[0].size(), 'a'); CheckFileContents(keys, values, expected_locations, - expected_unused_bucket, expected_max_buckets, 2, false); + expected_unused_bucket, expected_table_size, 2, false); } TEST(CuckooBuilderTest, WriteSuccessWithCollisionFullKey) { @@ -183,7 +197,7 @@ TEST(CuckooBuilderTest, WriteSuccessWithCollisionFullKey) { fname = test::TmpDir() + "/WithCollisionFullKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 100, BytewiseComparator(), GetSliceHash); + num_hash_fun, 100, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(keys[i]), Slice(values[i])); @@ -193,11 +207,49 @@ TEST(CuckooBuilderTest, WriteSuccessWithCollisionFullKey) { ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); - uint32_t expected_max_buckets = keys.size() / kHashTableRatio; + uint32_t expected_table_size = NextPowOf2(keys.size() / kHashTableRatio); std::string expected_unused_bucket = GetInternalKey("key00", true); expected_unused_bucket += std::string(values[0].size(), 'a'); CheckFileContents(keys, values, expected_locations, - expected_unused_bucket, expected_max_buckets, 4, false); + expected_unused_bucket, expected_table_size, 4, false); +} + +TEST(CuckooBuilderTest, WriteSuccessWithCollisionAndCuckooBlock) { + uint32_t num_hash_fun = 4; + std::vector user_keys = {"key01", "key02", "key03", "key04"}; + std::vector values = {"v01", "v02", "v03", "v04"}; + hash_map = { + {user_keys[0], {0, 1, 2, 3}}, + {user_keys[1], {0, 1, 2, 3}}, + {user_keys[2], {0, 1, 2, 3}}, + {user_keys[3], {0, 1, 2, 3}}, + }; + std::vector expected_locations = {0, 1, 2, 3}; + std::vector keys; + for (auto& user_key : user_keys) { + keys.push_back(GetInternalKey(user_key, false)); + } + + unique_ptr writable_file; + uint32_t cuckoo_block_size = 2; + fname = test::TmpDir() + "/WithCollisionFullKey2"; + ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); + CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, + num_hash_fun, 100, BytewiseComparator(), cuckoo_block_size, GetSliceHash); + ASSERT_OK(builder.status()); + for (uint32_t i = 0; i < user_keys.size(); i++) { + builder.Add(Slice(keys[i]), Slice(values[i])); + ASSERT_EQ(builder.NumEntries(), i + 1); + ASSERT_OK(builder.status()); + } + ASSERT_OK(builder.Finish()); + ASSERT_OK(writable_file->Close()); + + uint32_t expected_table_size = NextPowOf2(keys.size() / kHashTableRatio); + std::string expected_unused_bucket = GetInternalKey("key00", true); + expected_unused_bucket += std::string(values[0].size(), 'a'); + CheckFileContents(keys, values, expected_locations, + expected_unused_bucket, expected_table_size, 3, false, cuckoo_block_size); } TEST(CuckooBuilderTest, WithCollisionPathFullKey) { @@ -225,7 +277,46 @@ TEST(CuckooBuilderTest, WithCollisionPathFullKey) { fname = test::TmpDir() + "/WithCollisionPathFullKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 100, BytewiseComparator(), GetSliceHash); + num_hash_fun, 100, BytewiseComparator(), 1, GetSliceHash); + ASSERT_OK(builder.status()); + for (uint32_t i = 0; i < user_keys.size(); i++) { + builder.Add(Slice(keys[i]), Slice(values[i])); + ASSERT_EQ(builder.NumEntries(), i + 1); + ASSERT_OK(builder.status()); + } + ASSERT_OK(builder.Finish()); + ASSERT_OK(writable_file->Close()); + + uint32_t expected_table_size = NextPowOf2(keys.size() / kHashTableRatio); + std::string expected_unused_bucket = GetInternalKey("key00", true); + expected_unused_bucket += std::string(values[0].size(), 'a'); + CheckFileContents(keys, values, expected_locations, + expected_unused_bucket, expected_table_size, 2, false); +} + +TEST(CuckooBuilderTest, WithCollisionPathFullKeyAndCuckooBlock) { + uint32_t num_hash_fun = 2; + std::vector user_keys = {"key01", "key02", "key03", + "key04", "key05"}; + std::vector values = {"v01", "v02", "v03", "v04", "v05"}; + hash_map = { + {user_keys[0], {0, 1}}, + {user_keys[1], {1, 2}}, + {user_keys[2], {3, 4}}, + {user_keys[3], {4, 5}}, + {user_keys[4], {0, 3}}, + }; + std::vector expected_locations = {2, 1, 3, 4, 0}; + std::vector keys; + for (auto& user_key : user_keys) { + keys.push_back(GetInternalKey(user_key, false)); + } + + unique_ptr writable_file; + fname = test::TmpDir() + "/WithCollisionPathFullKeyAndCuckooBlock"; + ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); + CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, + num_hash_fun, 100, BytewiseComparator(), 2, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(keys[i]), Slice(values[i])); @@ -235,11 +326,11 @@ TEST(CuckooBuilderTest, WithCollisionPathFullKey) { ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); - uint32_t expected_max_buckets = keys.size() / kHashTableRatio; + uint32_t expected_table_size = NextPowOf2(keys.size() / kHashTableRatio); std::string expected_unused_bucket = GetInternalKey("key00", true); expected_unused_bucket += std::string(values[0].size(), 'a'); CheckFileContents(keys, values, expected_locations, - expected_unused_bucket, expected_max_buckets, 2, false); + expected_unused_bucket, expected_table_size, 2, false, 2); } TEST(CuckooBuilderTest, WriteSuccessNoCollisionUserKey) { @@ -258,7 +349,7 @@ TEST(CuckooBuilderTest, WriteSuccessNoCollisionUserKey) { fname = test::TmpDir() + "/NoCollisionUserKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 100, BytewiseComparator(), GetSliceHash); + num_hash_fun, 100, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(GetInternalKey(user_keys[i], true)), Slice(values[i])); @@ -268,11 +359,11 @@ TEST(CuckooBuilderTest, WriteSuccessNoCollisionUserKey) { ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); - uint32_t expected_max_buckets = user_keys.size() / kHashTableRatio; + uint32_t expected_table_size = NextPowOf2(user_keys.size() / kHashTableRatio); std::string expected_unused_bucket = "key00"; expected_unused_bucket += std::string(values[0].size(), 'a'); CheckFileContents(user_keys, values, expected_locations, - expected_unused_bucket, expected_max_buckets, 2, true); + expected_unused_bucket, expected_table_size, 2, true); } TEST(CuckooBuilderTest, WriteSuccessWithCollisionUserKey) { @@ -291,7 +382,7 @@ TEST(CuckooBuilderTest, WriteSuccessWithCollisionUserKey) { fname = test::TmpDir() + "/WithCollisionUserKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 100, BytewiseComparator(), GetSliceHash); + num_hash_fun, 100, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(GetInternalKey(user_keys[i], true)), Slice(values[i])); @@ -301,11 +392,11 @@ TEST(CuckooBuilderTest, WriteSuccessWithCollisionUserKey) { ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); - uint32_t expected_max_buckets = user_keys.size() / kHashTableRatio; + uint32_t expected_table_size = NextPowOf2(user_keys.size() / kHashTableRatio); std::string expected_unused_bucket = "key00"; expected_unused_bucket += std::string(values[0].size(), 'a'); CheckFileContents(user_keys, values, expected_locations, - expected_unused_bucket, expected_max_buckets, 4, true); + expected_unused_bucket, expected_table_size, 4, true); } TEST(CuckooBuilderTest, WithCollisionPathUserKey) { @@ -326,7 +417,7 @@ TEST(CuckooBuilderTest, WithCollisionPathUserKey) { fname = test::TmpDir() + "/WithCollisionPathUserKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 2, BytewiseComparator(), GetSliceHash); + num_hash_fun, 2, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(GetInternalKey(user_keys[i], true)), Slice(values[i])); @@ -336,11 +427,11 @@ TEST(CuckooBuilderTest, WithCollisionPathUserKey) { ASSERT_OK(builder.Finish()); ASSERT_OK(writable_file->Close()); - uint32_t expected_max_buckets = user_keys.size() / kHashTableRatio; + uint32_t expected_table_size = NextPowOf2(user_keys.size() / kHashTableRatio); std::string expected_unused_bucket = "key00"; expected_unused_bucket += std::string(values[0].size(), 'a'); CheckFileContents(user_keys, values, expected_locations, - expected_unused_bucket, expected_max_buckets, 2, true); + expected_unused_bucket, expected_table_size, 2, true); } TEST(CuckooBuilderTest, FailWhenCollisionPathTooLong) { @@ -362,7 +453,7 @@ TEST(CuckooBuilderTest, FailWhenCollisionPathTooLong) { fname = test::TmpDir() + "/WithCollisionPathUserKey"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 2, BytewiseComparator(), GetSliceHash); + num_hash_fun, 2, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t i = 0; i < user_keys.size(); i++) { builder.Add(Slice(GetInternalKey(user_keys[i], false)), Slice("value")); @@ -382,7 +473,7 @@ TEST(CuckooBuilderTest, FailWhenSameKeyInserted) { fname = test::TmpDir() + "/FailWhenSameKeyInserted"; ASSERT_OK(env_->NewWritableFile(fname, &writable_file, env_options_)); CuckooTableBuilder builder(writable_file.get(), kHashTableRatio, - num_hash_fun, 100, BytewiseComparator(), GetSliceHash); + num_hash_fun, 100, BytewiseComparator(), 1, GetSliceHash); ASSERT_OK(builder.status()); builder.Add(Slice(GetInternalKey(user_key, false)), Slice("value1")); diff --git a/table/cuckoo_table_factory.cc b/table/cuckoo_table_factory.cc index 71893702b..e2cc6fd89 100644 --- a/table/cuckoo_table_factory.cc +++ b/table/cuckoo_table_factory.cc @@ -9,34 +9,14 @@ #include "db/dbformat.h" #include "table/cuckoo_table_builder.h" #include "table/cuckoo_table_reader.h" -#include "util/murmurhash.h" namespace rocksdb { -extern const uint32_t kMaxNumHashTable = 64; - -extern uint64_t GetSliceMurmurHash(const Slice& s, uint32_t index, - uint64_t max_num_buckets) { - static constexpr uint32_t seeds[kMaxNumHashTable] = { - 816922183, 506425713, 949485004, 22513986, 421427259, 500437285, - 888981693, 847587269, 511007211, 722295391, 934013645, 566947683, - 193618736, 428277388, 770956674, 819994962, 755946528, 40807421, - 263144466, 241420041, 444294464, 731606396, 304158902, 563235655, - 968740453, 336996831, 462831574, 407970157, 985877240, 637708754, - 736932700, 205026023, 755371467, 729648411, 807744117, 46482135, - 847092855, 620960699, 102476362, 314094354, 625838942, 550889395, - 639071379, 834567510, 397667304, 151945969, 443634243, 196618243, - 421986347, 407218337, 964502417, 327741231, 493359459, 452453139, - 692216398, 108161624, 816246924, 234779764, 618949448, 496133787, - 156374056, 316589799, 982915425, 553105889 }; - return MurmurHash(s.data(), s.size(), seeds[index]) % max_num_buckets; -} - Status CuckooTableFactory::NewTableReader(const Options& options, const EnvOptions& soptions, const InternalKeyComparator& icomp, std::unique_ptr&& file, uint64_t file_size, std::unique_ptr* table) const { std::unique_ptr new_reader(new CuckooTableReader(options, - std::move(file), file_size, icomp.user_comparator(), GetSliceMurmurHash)); + std::move(file), file_size, icomp.user_comparator(), nullptr)); Status s = new_reader->status(); if (s.ok()) { *table = std::move(new_reader); @@ -47,9 +27,8 @@ Status CuckooTableFactory::NewTableReader(const Options& options, TableBuilder* CuckooTableFactory::NewTableBuilder( const Options& options, const InternalKeyComparator& internal_comparator, WritableFile* file, CompressionType compression_type) const { - return new CuckooTableBuilder(file, hash_table_ratio_, kMaxNumHashTable, - max_search_depth_, internal_comparator.user_comparator(), - GetSliceMurmurHash); + return new CuckooTableBuilder(file, hash_table_ratio_, 64, max_search_depth_, + internal_comparator.user_comparator(), cuckoo_block_size_, nullptr); } std::string CuckooTableFactory::GetPrintableTableOptions() const { @@ -64,12 +43,16 @@ std::string CuckooTableFactory::GetPrintableTableOptions() const { snprintf(buffer, kBufferSize, " max_search_depth: %u\n", max_search_depth_); ret.append(buffer); + snprintf(buffer, kBufferSize, " cuckoo_block_size: %u\n", + cuckoo_block_size_); + ret.append(buffer); return ret; } TableFactory* NewCuckooTableFactory(double hash_table_ratio, - uint32_t max_search_depth) { - return new CuckooTableFactory(hash_table_ratio, max_search_depth); + uint32_t max_search_depth, uint32_t cuckoo_block_size) { + return new CuckooTableFactory( + hash_table_ratio, max_search_depth, cuckoo_block_size); } } // namespace rocksdb diff --git a/table/cuckoo_table_factory.h b/table/cuckoo_table_factory.h index 573d769e8..5799a7f23 100644 --- a/table/cuckoo_table_factory.h +++ b/table/cuckoo_table_factory.h @@ -8,11 +8,23 @@ #include #include "rocksdb/table.h" +#include "util/murmurhash.h" namespace rocksdb { -extern uint64_t GetSliceMurmurHash(const Slice& s, uint32_t index, - uint64_t max_num_buckets); +const uint32_t kCuckooMurmurSeedMultiplier = 816922183; +static inline uint64_t CuckooHash( + const Slice& user_key, uint32_t hash_cnt, uint64_t table_size_minus_one, + uint64_t (*get_slice_hash)(const Slice&, uint32_t, uint64_t)) { +#ifndef NDEBUG + // This part is used only in unit tests. + if (get_slice_hash != nullptr) { + return get_slice_hash(user_key, hash_cnt, table_size_minus_one + 1); + } +#endif + return MurmurHash(user_key.data(), user_key.size(), + kCuckooMurmurSeedMultiplier * hash_cnt) & table_size_minus_one; +} // Cuckoo Table is designed for applications that require fast point lookups // but not fast range scans. @@ -23,9 +35,11 @@ extern uint64_t GetSliceMurmurHash(const Slice& s, uint32_t index, // - Does not support Merge operations. class CuckooTableFactory : public TableFactory { public: - CuckooTableFactory(double hash_table_ratio, uint32_t max_search_depth) + CuckooTableFactory(double hash_table_ratio, uint32_t max_search_depth, + uint32_t cuckoo_block_size) : hash_table_ratio_(hash_table_ratio), - max_search_depth_(max_search_depth) {} + max_search_depth_(max_search_depth), + cuckoo_block_size_(cuckoo_block_size) {} ~CuckooTableFactory() {} const char* Name() const override { return "CuckooTable"; } @@ -41,7 +55,7 @@ class CuckooTableFactory : public TableFactory { CompressionType compression_type) const override; // Sanitizes the specified DB Options. - Status SanitizeDBOptions(DBOptions* db_opts) const override { + Status SanitizeDBOptions(const DBOptions* db_opts) const override { return Status::OK(); } @@ -50,6 +64,7 @@ class CuckooTableFactory : public TableFactory { private: const double hash_table_ratio_; const uint32_t max_search_depth_; + const uint32_t cuckoo_block_size_; }; } // namespace rocksdb diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index 636db5bfa..f1dcbc3bb 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -17,10 +17,14 @@ #include #include "rocksdb/iterator.h" #include "table/meta_blocks.h" +#include "table/cuckoo_table_factory.h" #include "util/arena.h" #include "util/coding.h" namespace rocksdb { +namespace { + static const uint64_t CACHE_LINE_MASK = ~((uint64_t)CACHE_LINE_SIZE - 1); +} extern const uint64_t kCuckooTableMagicNumber; @@ -44,12 +48,12 @@ CuckooTableReader::CuckooTableReader( } table_props_.reset(props); auto& user_props = props->user_collected_properties; - auto hash_funs = user_props.find(CuckooTablePropertyNames::kNumHashTable); + auto hash_funs = user_props.find(CuckooTablePropertyNames::kNumHashFunc); if (hash_funs == user_props.end()) { status_ = Status::InvalidArgument("Number of hash functions not found"); return; } - num_hash_fun_ = *reinterpret_cast(hash_funs->second.data()); + num_hash_func_ = *reinterpret_cast(hash_funs->second.data()); auto unused_key = user_props.find(CuckooTablePropertyNames::kEmptyKey); if (unused_key == user_props.end()) { status_ = Status::InvalidArgument("Empty bucket value not found"); @@ -67,18 +71,29 @@ CuckooTableReader::CuckooTableReader( value_length->second.data()); bucket_length_ = key_length_ + value_length_; - auto num_buckets = user_props.find(CuckooTablePropertyNames::kMaxNumBuckets); - if (num_buckets == user_props.end()) { - status_ = Status::InvalidArgument("Num buckets not found"); + auto hash_table_size = user_props.find( + CuckooTablePropertyNames::kHashTableSize); + if (hash_table_size == user_props.end()) { + status_ = Status::InvalidArgument("Hash table size not found"); return; } - num_buckets_ = *reinterpret_cast(num_buckets->second.data()); + table_size_minus_one_ = *reinterpret_cast( + hash_table_size->second.data()) - 1; auto is_last_level = user_props.find(CuckooTablePropertyNames::kIsLastLevel); if (is_last_level == user_props.end()) { status_ = Status::InvalidArgument("Is last level not found"); return; } is_last_level_ = *reinterpret_cast(is_last_level->second.data()); + auto cuckoo_block_size = user_props.find( + CuckooTablePropertyNames::kCuckooBlockSize); + if (cuckoo_block_size == user_props.end()) { + status_ = Status::InvalidArgument("Cuckoo block size not found"); + return; + } + cuckoo_block_size_ = *reinterpret_cast( + cuckoo_block_size->second.data()); + cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1; status_ = file_->Read(0, file_size, &file_data_, nullptr); } @@ -89,40 +104,46 @@ Status CuckooTableReader::Get( void (*mark_key_may_exist_handler)(void* handle_context)) { assert(key.size() == key_length_ + (is_last_level_ ? 8 : 0)); Slice user_key = ExtractUserKey(key); - for (uint32_t hash_cnt = 0; hash_cnt < num_hash_fun_; ++hash_cnt) { - uint64_t hash_val = get_slice_hash_(user_key, hash_cnt, num_buckets_); - assert(hash_val < num_buckets_); - const char* bucket = &file_data_.data()[hash_val * bucket_length_]; - if (ucomp_->Compare(Slice(unused_key_.data(), user_key.size()), - Slice(bucket, user_key.size())) == 0) { - return Status::OK(); - } - // Here, we compare only the user key part as we support only one entry - // per user key and we don't support sanpshot. - if (ucomp_->Compare(user_key, Slice(bucket, user_key.size())) == 0) { - Slice value = Slice(&bucket[key_length_], value_length_); - if (is_last_level_) { - ParsedInternalKey found_ikey(Slice(bucket, key_length_), 0, kTypeValue); - result_handler(handle_context, found_ikey, value); - } else { - Slice full_key(bucket, key_length_); - ParsedInternalKey found_ikey; - ParseInternalKey(full_key, &found_ikey); - result_handler(handle_context, found_ikey, value); + for (uint32_t hash_cnt = 0; hash_cnt < num_hash_func_; ++hash_cnt) { + uint64_t offset = bucket_length_ * CuckooHash( + user_key, hash_cnt, table_size_minus_one_, get_slice_hash_); + const char* bucket = &file_data_.data()[offset]; + for (uint32_t block_idx = 0; block_idx < cuckoo_block_size_; + ++block_idx, bucket += bucket_length_) { + if (ucomp_->Compare(Slice(unused_key_.data(), user_key.size()), + Slice(bucket, user_key.size())) == 0) { + return Status::OK(); + } + // Here, we compare only the user key part as we support only one entry + // per user key and we don't support sanpshot. + if (ucomp_->Compare(user_key, Slice(bucket, user_key.size())) == 0) { + Slice value = Slice(&bucket[key_length_], value_length_); + if (is_last_level_) { + ParsedInternalKey found_ikey( + Slice(bucket, key_length_), 0, kTypeValue); + result_handler(handle_context, found_ikey, value); + } else { + Slice full_key(bucket, key_length_); + ParsedInternalKey found_ikey; + ParseInternalKey(full_key, &found_ikey); + result_handler(handle_context, found_ikey, value); + } + // We don't support merge operations. So, we return here. + return Status::OK(); } - // We don't support merge operations. So, we return here. - return Status::OK(); } } return Status::OK(); } void CuckooTableReader::Prepare(const Slice& key) { + // Prefetch the first Cuckoo Block. Slice user_key = ExtractUserKey(key); - // Prefetching first location also helps improve Get performance. - for (uint32_t hash_cnt = 0; hash_cnt < num_hash_fun_; ++hash_cnt) { - uint64_t hash_val = get_slice_hash_(user_key, hash_cnt, num_buckets_); - PREFETCH(&file_data_.data()[hash_val * bucket_length_], 0, 3); + uint64_t addr = reinterpret_cast(file_data_.data()) + + bucket_length_ * CuckooHash(user_key, 0, table_size_minus_one_, nullptr); + uint64_t end_addr = addr + cuckoo_block_bytes_minus_one_; + for (addr &= CACHE_LINE_MASK; addr < end_addr; addr += CACHE_LINE_SIZE) { + PREFETCH(reinterpret_cast(addr), 0, 3); } } @@ -186,7 +207,9 @@ CuckooTableIterator::CuckooTableIterator(CuckooTableReader* reader) void CuckooTableIterator::LoadKeysFromReader() { key_to_bucket_id_.reserve(reader_->GetTableProperties()->num_entries); - for (uint32_t bucket_id = 0; bucket_id < reader_->num_buckets_; bucket_id++) { + uint64_t num_buckets = reader_->table_size_minus_one_ + + reader_->cuckoo_block_size_; + for (uint32_t bucket_id = 0; bucket_id < num_buckets; bucket_id++) { Slice read_key; status_ = reader_->file_->Read(bucket_id * reader_->bucket_length_, reader_->key_length_, &read_key, nullptr); diff --git a/table/cuckoo_table_reader.h b/table/cuckoo_table_reader.h index ad5d4ec47..05d5c3397 100644 --- a/table/cuckoo_table_reader.h +++ b/table/cuckoo_table_reader.h @@ -65,12 +65,14 @@ class CuckooTableReader: public TableReader { bool is_last_level_; std::shared_ptr table_props_; Status status_; - uint32_t num_hash_fun_; + uint32_t num_hash_func_; std::string unused_key_; uint32_t key_length_; uint32_t value_length_; uint32_t bucket_length_; - uint64_t num_buckets_; + uint32_t cuckoo_block_size_; + uint32_t cuckoo_block_bytes_minus_one_; + uint64_t table_size_minus_one_; const Comparator* ucomp_; uint64_t (*get_slice_hash_)(const Slice& s, uint32_t index, uint64_t max_num_buckets); diff --git a/table/cuckoo_table_reader_test.cc b/table/cuckoo_table_reader_test.cc index c026a2742..63fe0ae5b 100644 --- a/table/cuckoo_table_reader_test.cc +++ b/table/cuckoo_table_reader_test.cc @@ -38,9 +38,6 @@ DEFINE_bool(write, false, namespace rocksdb { -extern const uint64_t kCuckooTableMagicNumber; -extern const uint64_t kMaxNumHashTable; - namespace { const uint32_t kNumHashFunc = 10; // Methods, variables related to Hash functions. @@ -109,7 +106,7 @@ class CuckooReaderTest { std::unique_ptr writable_file; ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); CuckooTableBuilder builder( - writable_file.get(), 0.9, kNumHashFunc, 100, ucomp, GetSliceHash); + writable_file.get(), 0.9, kNumHashFunc, 100, ucomp, 2, GetSliceHash); ASSERT_OK(builder.status()); for (uint32_t key_idx = 0; key_idx < num_items; ++key_idx) { builder.Add(Slice(keys[key_idx]), Slice(values[key_idx])); @@ -397,13 +394,12 @@ void GetKeys(uint64_t num, std::vector* keys) { } } -std::string GetFileName(uint64_t num, double hash_ratio) { +std::string GetFileName(uint64_t num) { if (FLAGS_file_dir.empty()) { FLAGS_file_dir = test::TmpDir(); } return FLAGS_file_dir + "/cuckoo_read_benchmark" + - std::to_string(num/1000000) + "Mratio" + - std::to_string(static_cast(100*hash_ratio)); + std::to_string(num/1000000) + "Mkeys"; } // Create last level file as we are interested in measuring performance of @@ -414,13 +410,13 @@ void WriteFile(const std::vector& keys, options.allow_mmap_reads = true; Env* env = options.env; EnvOptions env_options = EnvOptions(options); - std::string fname = GetFileName(num, hash_ratio); + std::string fname = GetFileName(num); std::unique_ptr writable_file; ASSERT_OK(env->NewWritableFile(fname, &writable_file, env_options)); CuckooTableBuilder builder( writable_file.get(), hash_ratio, - kMaxNumHashTable, 1000, test::Uint64Comparator(), GetSliceMurmurHash); + 64, 1000, test::Uint64Comparator(), 5, nullptr); ASSERT_OK(builder.status()); for (uint64_t key_idx = 0; key_idx < num; ++key_idx) { // Value is just a part of key. @@ -439,27 +435,25 @@ void WriteFile(const std::vector& keys, CuckooTableReader reader( options, std::move(read_file), file_size, - test::Uint64Comparator(), GetSliceMurmurHash); + test::Uint64Comparator(), nullptr); ASSERT_OK(reader.status()); ReadOptions r_options; - for (const auto& key : keys) { + for (uint64_t i = 0; i < num; ++i) { int cnt = 0; - ASSERT_OK(reader.Get(r_options, Slice(key), &cnt, CheckValue, nullptr)); + ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &cnt, CheckValue, nullptr)); if (cnt != 1) { - fprintf(stderr, "%" PRIx64 " not found.\n", - *reinterpret_cast(key.data())); + fprintf(stderr, "%" PRIu64 " not found.\n", i); ASSERT_EQ(1, cnt); } } } -void ReadKeys(const std::vector& keys, uint64_t num, - double hash_ratio, uint32_t batch_size) { +void ReadKeys(uint64_t num, uint32_t batch_size) { Options options; options.allow_mmap_reads = true; Env* env = options.env; EnvOptions env_options = EnvOptions(options); - std::string fname = GetFileName(num, hash_ratio); + std::string fname = GetFileName(num); uint64_t file_size; env->GetFileSize(fname, &file_size); @@ -468,29 +462,33 @@ void ReadKeys(const std::vector& keys, uint64_t num, CuckooTableReader reader( options, std::move(read_file), file_size, test::Uint64Comparator(), - GetSliceMurmurHash); + nullptr); ASSERT_OK(reader.status()); const UserCollectedProperties user_props = reader.GetTableProperties()->user_collected_properties; const uint32_t num_hash_fun = *reinterpret_cast( - user_props.at(CuckooTablePropertyNames::kNumHashTable).data()); - fprintf(stderr, "With %" PRIu64 " items and hash table ratio %f, number of" - " hash functions used: %u.\n", num, hash_ratio, num_hash_fun); + user_props.at(CuckooTablePropertyNames::kNumHashFunc).data()); + const uint64_t table_size = *reinterpret_cast( + user_props.at(CuckooTablePropertyNames::kHashTableSize).data()); + fprintf(stderr, "With %" PRIu64 " items, utilization is %.2f%%, number of" + " hash functions: %u.\n", num, num * 100.0 / (table_size), num_hash_fun); ReadOptions r_options; uint64_t start_time = env->NowMicros(); if (batch_size > 0) { for (uint64_t i = 0; i < num; i += batch_size) { for (uint64_t j = i; j < i+batch_size && j < num; ++j) { - reader.Prepare(Slice(keys[j])); + reader.Prepare(Slice(reinterpret_cast(&j), 16)); } for (uint64_t j = i; j < i+batch_size && j < num; ++j) { - reader.Get(r_options, Slice(keys[j]), nullptr, DoNothing, nullptr); + reader.Get(r_options, Slice(reinterpret_cast(&j), 16), + nullptr, DoNothing, nullptr); } } } else { for (uint64_t i = 0; i < num; i++) { - reader.Get(r_options, Slice(keys[i]), nullptr, DoNothing, nullptr); + reader.Get(r_options, Slice(reinterpret_cast(&i), 16), nullptr, + DoNothing, nullptr); } } float time_per_op = (env->NowMicros() - start_time) * 1.0 / num; @@ -501,26 +499,30 @@ void ReadKeys(const std::vector& keys, uint64_t num, } // namespace. TEST(CuckooReaderTest, TestReadPerformance) { - uint64_t num = 1000*1000*100; if (!FLAGS_enable_perf) { return; } + double hash_ratio = 0.95; + // These numbers are chosen to have a hash utilizaiton % close to + // 0.9, 0.75, 0.6 and 0.5 respectively. + // They all create 128 M buckets. + std::vector nums = {120*1000*1000, 100*1000*1000, 80*1000*1000, + 70*1000*1000}; #ifndef NDEBUG fprintf(stdout, "WARNING: Not compiled with DNDEBUG. Performance tests may be slow.\n"); #endif std::vector keys; - GetKeys(num, &keys); - for (double hash_ratio : std::vector({0.5, 0.6, 0.75, 0.9})) { - if (FLAGS_write || !Env::Default()->FileExists( - GetFileName(num, hash_ratio))) { + GetKeys(*std::max_element(nums.begin(), nums.end()), &keys); + for (uint64_t num : nums) { + if (FLAGS_write || !Env::Default()->FileExists(GetFileName(num))) { WriteFile(keys, num, hash_ratio); } - ReadKeys(keys, num, hash_ratio, 0); - ReadKeys(keys, num, hash_ratio, 10); - ReadKeys(keys, num, hash_ratio, 25); - ReadKeys(keys, num, hash_ratio, 50); - ReadKeys(keys, num, hash_ratio, 100); + ReadKeys(num, 0); + ReadKeys(num, 10); + ReadKeys(num, 25); + ReadKeys(num, 50); + ReadKeys(num, 100); fprintf(stderr, "\n"); } } diff --git a/table/filter_block.cc b/table/filter_block.cc index 8366db268..6b4ff1c10 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -71,20 +71,14 @@ void FilterBlockBuilder::AddKey(const Slice& key) { } // add prefix to filter if needed - if (prefix_extractor_ && prefix_extractor_->InDomain(ExtractUserKey(key))) { - // If prefix_extractor_, this filter_block layer assumes we only - // operate on internal keys. - Slice user_key = ExtractUserKey(key); + if (prefix_extractor_ && prefix_extractor_->InDomain(key)) { // this assumes prefix(prefix(key)) == prefix(key), as the last // entry in entries_ may be either a key or prefix, and we use // prefix(last entry) to get the prefix of the last key. - if (prev.size() == 0 || - !SamePrefix(user_key, ExtractUserKey(prev))) { - Slice prefix = prefix_extractor_->Transform(user_key); - InternalKey internal_prefix_tmp(prefix, 0, kTypeValue); - Slice internal_prefix = internal_prefix_tmp.Encode(); + if (prev.size() == 0 || !SamePrefix(key, prev)) { + Slice prefix = prefix_extractor_->Transform(key); start_.push_back(entries_.size()); - entries_.append(internal_prefix.data(), internal_prefix.size()); + entries_.append(prefix.data(), prefix.size()); } } } diff --git a/table/format.cc b/table/format.cc index a642965d5..46105247f 100644 --- a/table/format.cc +++ b/table/format.cc @@ -211,10 +211,13 @@ Status ReadBlock(RandomAccessFile* file, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, Slice* contents, /* result of reading */ char* buf) { size_t n = static_cast(handle.size()); + Status s; + + { + PERF_TIMER_GUARD(block_read_time); + s = file->Read(handle.offset(), n + kBlockTrailerSize, contents, buf); + } - PERF_TIMER_AUTO(block_read_time); - Status s = file->Read(handle.offset(), n + kBlockTrailerSize, contents, buf); - PERF_TIMER_MEASURE(block_read_time); PERF_COUNTER_ADD(block_read_count, 1); PERF_COUNTER_ADD(block_read_byte, n + kBlockTrailerSize); @@ -228,6 +231,7 @@ Status ReadBlock(RandomAccessFile* file, const Footer& footer, // Check the crc of the type and the block contents const char* data = contents->data(); // Pointer to where Read put the data if (options.verify_checksums) { + PERF_TIMER_GUARD(block_checksum_time); uint32_t value = DecodeFixed32(data + n + 1); uint32_t actual = 0; switch (footer.checksum()) { @@ -247,7 +251,6 @@ Status ReadBlock(RandomAccessFile* file, const Footer& footer, if (!s.ok()) { return s; } - PERF_TIMER_STOP(block_checksum_time); } return s; } @@ -265,7 +268,7 @@ Status DecompressBlock(BlockContents* result, size_t block_size, result->cachable = false; result->heap_allocated = false; - PERF_TIMER_AUTO(block_decompress_time); + PERF_TIMER_GUARD(block_decompress_time); rocksdb::CompressionType compression_type = static_cast(data[n]); // If the caller has requested that the block not be uncompressed @@ -295,7 +298,6 @@ Status DecompressBlock(BlockContents* result, size_t block_size, } else { s = UncompressBlockContents(data, n, result); } - PERF_TIMER_STOP(block_decompress_time); return s; } diff --git a/table/merger.cc b/table/merger.cc index 611480cec..a53376ceb 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -116,12 +116,12 @@ class MergingIterator : public Iterator { // Invalidate the heap. use_heap_ = false; IteratorWrapper* first_child = nullptr; - PERF_TIMER_DECLARE(); for (auto& child : children_) { - PERF_TIMER_START(seek_child_seek_time); - child.Seek(target); - PERF_TIMER_STOP(seek_child_seek_time); + { + PERF_TIMER_GUARD(seek_child_seek_time); + child.Seek(target); + } PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { @@ -134,24 +134,21 @@ class MergingIterator : public Iterator { } else { // We have more than one children with valid keys. Initialize // the heap and put the first child into the heap. - PERF_TIMER_START(seek_min_heap_time); + PERF_TIMER_GUARD(seek_min_heap_time); ClearHeaps(); minHeap_.push(first_child); - PERF_TIMER_STOP(seek_min_heap_time); } } if (use_heap_) { - PERF_TIMER_START(seek_min_heap_time); + PERF_TIMER_GUARD(seek_min_heap_time); minHeap_.push(&child); - PERF_TIMER_STOP(seek_min_heap_time); } } } if (use_heap_) { // If heap is valid, need to put the smallest key to curent_. - PERF_TIMER_START(seek_min_heap_time); + PERF_TIMER_GUARD(seek_min_heap_time); FindSmallest(); - PERF_TIMER_STOP(seek_min_heap_time); } else { // The heap is not valid, then the current_ iterator is the first // one, or null if there is no first child. diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index a95f4c119..d9d0ed6c9 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -16,9 +16,7 @@ namespace rocksdb { MetaIndexBuilder::MetaIndexBuilder() - : meta_index_block_( - new BlockBuilder(1 /* restart interval */, BytewiseComparator())) { -} + : meta_index_block_(new BlockBuilder(1 /* restart interval */)) {} void MetaIndexBuilder::Add(const std::string& key, const BlockHandle& handle) { @@ -35,9 +33,7 @@ Slice MetaIndexBuilder::Finish() { } PropertyBlockBuilder::PropertyBlockBuilder() - : properties_block_( - new BlockBuilder(1 /* restart interval */, BytewiseComparator())) { -} + : properties_block_(new BlockBuilder(1 /* restart interval */)) {} void PropertyBlockBuilder::Add(const std::string& name, const std::string& val) { diff --git a/table/plain_table_factory.h b/table/plain_table_factory.h index 31e20b016..d1cf0cae6 100644 --- a/table/plain_table_factory.h +++ b/table/plain_table_factory.h @@ -169,7 +169,7 @@ class PlainTableFactory : public TableFactory { static const char kValueTypeSeqId0 = 0xFF; // Sanitizes the specified DB Options. - Status SanitizeDBOptions(DBOptions* db_opts) const override { + Status SanitizeDBOptions(const DBOptions* db_opts) const override { if (db_opts->allow_mmap_reads == false) { return Status::NotSupported( "PlainTable with allow_mmap_reads == false is not supported."); diff --git a/table/table_test.cc b/table/table_test.cc index 929cdf832..500abf48f 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -244,8 +244,7 @@ class BlockConstructor: public Constructor { const KVMap& data) { delete block_; block_ = nullptr; - BlockBuilder builder(table_options.block_restart_interval, - &internal_comparator); + BlockBuilder builder(table_options.block_restart_interval); for (KVMap::const_iterator it = data.begin(); it != data.end(); @@ -1054,7 +1053,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) { ASSERT_EQ("", props.filter_policy_name); // no filter policy is used // Verify data size. - BlockBuilder block_builder(1, options.comparator); + BlockBuilder block_builder(1); for (const auto& item : kvmap) { block_builder.Add(item.first, item.second); } diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 6af48f58c..ae4e46239 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -172,8 +172,9 @@ void TwoLevelIterator::InitDataBlock() { SetSecondLevelIterator(nullptr); } else { Slice handle = first_level_iter_.value(); - if (second_level_iter_.iter() != nullptr - && handle.compare(data_block_handle_) == 0) { + if (second_level_iter_.iter() != nullptr && + !second_level_iter_.status().IsIncomplete() && + handle.compare(data_block_handle_) == 0) { // second_level_iter is already constructed with this iterator, so // no need to change anything } else { diff --git a/tools/db_stress.cc b/tools/db_stress.cc index cffcb1c47..e9955953d 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -31,6 +31,7 @@ int main() { #include #include #include +#include #include #include "db/db_impl.h" #include "db/version_set.h" @@ -759,7 +760,7 @@ class StressTest { ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), db_(nullptr), - new_column_family_name_(0), + new_column_family_name_(1), num_times_reopened_(0) { if (FLAGS_destroy_db_initially) { std::vector files; @@ -1217,12 +1218,20 @@ class StressTest { Status s __attribute__((unused)); s = db_->DropColumnFamily(column_families_[cf]); delete column_families_[cf]; - assert(s.ok()); + if (!s.ok()) { + fprintf(stderr, "dropping column family error: %s\n", + s.ToString().c_str()); + std::terminate(); + } s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, &column_families_[cf]); column_family_names_[cf] = new_name; thread->shared->ClearColumnFamily(cf); - assert(s.ok()); + if (!s.ok()) { + fprintf(stderr, "creating column family error: %s\n", + s.ToString().c_str()); + std::terminate(); + } thread->shared->UnlockColumnFamily(cf); } } @@ -1297,10 +1306,15 @@ class StressTest { } } thread->shared->Put(rand_column_family, rand_key, value_base); + Status s; if (FLAGS_use_merge) { - db_->Merge(write_opts, column_family, key, v); + s = db_->Merge(write_opts, column_family, key, v); } else { - db_->Put(write_opts, column_family, key, v); + s = db_->Put(write_opts, column_family, key, v); + } + if (!s.ok()) { + fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); + std::terminate(); } thread->stats.AddBytesForWrites(1, sz); } else { @@ -1311,8 +1325,12 @@ class StressTest { // OPERATION delete if (!FLAGS_test_batches_snapshots) { thread->shared->Delete(rand_column_family, rand_key); - db_->Delete(write_opts, column_family, key); + Status s = db_->Delete(write_opts, column_family, key); thread->stats.AddDeletes(1); + if (!s.ok()) { + fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); + std::terminate(); + } } else { MultiDelete(thread, write_opts, column_family, key); } diff --git a/util/env_posix.cc b/util/env_posix.cc index d644e7b0e..cf917e874 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -239,11 +239,23 @@ class PosixRandomAccessFile: public RandomAccessFile { char* scratch) const { Status s; ssize_t r = -1; - do { - r = pread(fd_, scratch, n, static_cast(offset)); - } while (r < 0 && errno == EINTR); - IOSTATS_ADD_IF_POSITIVE(bytes_read, r); - *result = Slice(scratch, (r < 0) ? 0 : r); + size_t left = n; + char* ptr = scratch; + while (left > 0) { + r = pread(fd_, ptr, left, static_cast(offset)); + if (r <= 0) { + if (errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + } + + IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left); + *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { // An error: return a non-ok status s = IOError(filename_, errno); @@ -907,9 +919,23 @@ class PosixRandomRWFile : public RandomRWFile { virtual Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { Status s; - ssize_t r = pread(fd_, scratch, n, static_cast(offset)); - IOSTATS_ADD_IF_POSITIVE(bytes_read, r); - *result = Slice(scratch, (r < 0) ? 0 : r); + ssize_t r = -1; + size_t left = n; + char* ptr = scratch; + while (left > 0) { + r = pread(fd_, ptr, left, static_cast(offset)); + if (r <= 0) { + if (errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + } + IOSTATS_ADD_IF_POSITIVE(bytes_read, n - left); + *result = Slice(scratch, (r < 0) ? 0 : n - left); if (r < 0) { s = IOError(filename_, errno); } @@ -1018,15 +1044,12 @@ class PosixFileLock : public FileLock { std::string filename; }; - -namespace { void PthreadCall(const char* label, int result) { if (result != 0) { fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); exit(1); } } -} class PosixEnv : public Env { public: @@ -1724,12 +1747,11 @@ unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { return thread_pools_[pri].GetQueueLen(); } -namespace { struct StartThreadState { void (*user_function)(void*); void* arg; }; -} + static void* StartThreadWrapper(void* arg) { StartThreadState* state = reinterpret_cast(arg); state->user_function(state->arg); diff --git a/util/iostats_context.cc b/util/iostats_context.cc index 610831779..090813abc 100644 --- a/util/iostats_context.cc +++ b/util/iostats_context.cc @@ -9,7 +9,9 @@ namespace rocksdb { +#ifndef IOS_CROSS_COMPILE __thread IOStatsContext iostats_context; +#endif // IOS_CROSS_COMPILE void IOStatsContext::Reset() { thread_pool_id = Env::Priority::TOTAL; diff --git a/util/iostats_context_imp.h b/util/iostats_context_imp.h index ed34037d3..b271ddf14 100644 --- a/util/iostats_context_imp.h +++ b/util/iostats_context_imp.h @@ -6,6 +6,8 @@ #pragma once #include "rocksdb/iostats_context.h" +#ifndef IOS_CROSS_COMPILE + // increment a specific counter by the specified value #define IOSTATS_ADD(metric, value) \ (iostats_context.metric += value) @@ -30,3 +32,15 @@ #define IOSTATS(metric) \ (iostats_context.metric) + +#else // IOS_CROSS_COMPILE + +#define IOSTATS_ADD(metric, value) +#define IOSTATS_ADD_IF_POSITIVE(metric, value) +#define IOSTATS_RESET(metric) +#define IOSTATS_RESET_ALL() +#define IOSTATS_SET_THREAD_POOL_ID(value) +#define IOSTATS_THREAD_POOL_ID() +#define IOSTATS(metric) 0 + +#endif // IOS_CROSS_COMPILE diff --git a/util/perf_context_imp.h b/util/perf_context_imp.h index dc4ae95e5..e39790105 100644 --- a/util/perf_context_imp.h +++ b/util/perf_context_imp.h @@ -11,11 +11,10 @@ namespace rocksdb { #if defined(NPERF_CONTEXT) || defined(IOS_CROSS_COMPILE) -#define PERF_TIMER_DECLARE() -#define PERF_TIMER_START(metric) -#define PERF_TIMER_AUTO(metric) +#define PERF_TIMER_GUARD(metric) #define PERF_TIMER_MEASURE(metric) #define PERF_TIMER_STOP(metric) +#define PERF_TIMER_START(metric) #define PERF_COUNTER_ADD(metric, value) #else @@ -24,10 +23,15 @@ extern __thread PerfLevel perf_level; class PerfStepTimer { public: - PerfStepTimer() + PerfStepTimer(uint64_t* metric) : enabled_(perf_level >= PerfLevel::kEnableTime), env_(enabled_ ? Env::Default() : nullptr), - start_(0) { + start_(0), + metric_(metric) { + } + + ~PerfStepTimer() { + Stop(); } void Start() { @@ -36,17 +40,17 @@ class PerfStepTimer { } } - void Measure(uint64_t* metric) { + void Measure() { if (start_) { uint64_t now = env_->NowNanos(); - *metric += now - start_; + *metric_ += now - start_; start_ = now; } } - void Stop(uint64_t* metric) { + void Stop() { if (start_) { - *metric += env_->NowNanos() - start_; + *metric_ += env_->NowNanos() - start_; start_ = 0; } } @@ -55,29 +59,25 @@ class PerfStepTimer { const bool enabled_; Env* const env_; uint64_t start_; + uint64_t* metric_; }; -// Declare the local timer object to be used later on -#define PERF_TIMER_DECLARE() \ - PerfStepTimer perf_step_timer; +// Stop the timer and update the metric +#define PERF_TIMER_STOP(metric) \ + perf_step_timer_ ## metric.Stop(); -// Set start time of the timer #define PERF_TIMER_START(metric) \ - perf_step_timer.Start(); + perf_step_timer_ ## metric.Start(); // Declare and set start time of the timer -#define PERF_TIMER_AUTO(metric) \ - PerfStepTimer perf_step_timer; \ - perf_step_timer.Start(); +#define PERF_TIMER_GUARD(metric) \ + PerfStepTimer perf_step_timer_ ## metric(&(perf_context.metric)); \ + perf_step_timer_ ## metric.Start(); // Update metric with time elapsed since last START. start time is reset // to current timestamp. #define PERF_TIMER_MEASURE(metric) \ - perf_step_timer.Measure(&(perf_context.metric)); - -// Update metric with time elapsed since last START. But start time is not set. -#define PERF_TIMER_STOP(metric) \ - perf_step_timer.Stop(&(perf_context.metric)); + perf_step_timer_ ## metric.Measure(); // Increase metric value #define PERF_COUNTER_ADD(metric, value) \ diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index 7de2acc86..8b9e49bd4 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -621,10 +621,13 @@ class SpatialDBImpl : public SpatialDB { namespace { DBOptions GetDBOptions(const SpatialDBOptions& options) { DBOptions db_options; - db_options.max_background_compactions = options.num_threads / 2; - db_options.max_background_flushes = options.num_threads / 2; - db_options.env->SetBackgroundThreads(db_options.max_background_compactions, Env::LOW); - db_options.env->SetBackgroundThreads(db_options.max_background_flushes, Env::HIGH); + db_options.max_background_compactions = 3 * options.num_threads / 4; + db_options.max_background_flushes = + options.num_threads - db_options.max_background_compactions; + db_options.env->SetBackgroundThreads(db_options.max_background_compactions, + Env::LOW); + db_options.env->SetBackgroundThreads(db_options.max_background_flushes, + Env::HIGH); if (options.bulk_load) { db_options.disableDataSync = true; } @@ -634,14 +637,16 @@ DBOptions GetDBOptions(const SpatialDBOptions& options) { ColumnFamilyOptions GetColumnFamilyOptions(const SpatialDBOptions& options, std::shared_ptr block_cache) { ColumnFamilyOptions column_family_options; - column_family_options.write_buffer_size = 128 * 1024 * 1024; // 128MB - column_family_options.max_bytes_for_level_base = 1024 * 1024 * 1024; // 1 GB + column_family_options.write_buffer_size = 128 * 1024 * 1024; // 128MB column_family_options.max_write_buffer_number = 4; - // only compress levels >= 1 + column_family_options.level0_file_num_compaction_trigger = 2; + column_family_options.level0_slowdown_writes_trigger = 16; + column_family_options.level0_slowdown_writes_trigger = 32; + // only compress levels >= 2 column_family_options.compression_per_level.resize( column_family_options.num_levels); for (int i = 0; i < column_family_options.num_levels; ++i) { - if (i == 0) { + if (i < 2) { column_family_options.compression_per_level[i] = kNoCompression; } else { column_family_options.compression_per_level[i] = kLZ4Compression; @@ -651,17 +656,6 @@ ColumnFamilyOptions GetColumnFamilyOptions(const SpatialDBOptions& options, table_options.block_cache = block_cache; column_family_options.table_factory.reset( NewBlockBasedTableFactory(table_options)); - if (options.bulk_load) { - column_family_options.level0_file_num_compaction_trigger = (1 << 30); - column_family_options.level0_slowdown_writes_trigger = (1 << 30); - column_family_options.level0_stop_writes_trigger = (1 << 30); - column_family_options.disable_auto_compactions = true; - column_family_options.source_compaction_factor = (1 << 30); - column_family_options.num_levels = 2; - column_family_options.target_file_size_base = 256 * 1024 * 1024; - column_family_options.max_mem_compaction_level = 0; - column_family_options.memtable_factory.reset(new VectorRepFactory()); - } return column_family_options; }