diff --git a/HISTORY.md b/HISTORY.md index 3ae92d365..7d4766ce9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,8 +1,10 @@ # Rocksdb Change Log ## Unreleased +### New Features * Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies. * Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions. * Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`. +* Improve subcompaction range partition so that it is likely to be more even. More evenly distribution of subcompaction will improve compaction throughput for some workloads. All input files' index blocks to sample some anchor key points from which we pick positions to partition the input range. This would introduce some CPU overhead in compaction preparation phase, if subcompaction is enabled, but it should be a small fraction of the CPU usage of the whole compaction process. This also brings a behavier change: subcompaction number is much more likely to maxed out than before. ### Public API changes * Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions. diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index f35b2b5ca..c883716a0 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -672,8 +672,7 @@ bool Compaction::ShouldFormSubcompactions() const { } if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { - return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0 && - !IsOutputLevelEmpty(); + return (start_level_ == 0 || is_manual_compaction_) && output_level_ > 0; } else if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { return number_levels_ > 1 && output_level_ > 0; } else { diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index b914f5e9d..c78bf2adc 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include "db/log_writer.h" #include "db/merge_helper.h" #include "db/range_del_aggregator.h" +#include "db/version_edit.h" #include "db/version_set.h" #include "file/filename.h" #include "file/read_write_util.h" @@ -44,6 +46,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/options.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -232,24 +235,22 @@ void CompactionJob::Prepare() { bottommost_level_ = c->bottommost_level(); if (c->ShouldFormSubcompactions()) { - { StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); GenSubcompactionBoundaries(); - } - + } + if (boundaries_.size() > 1) { for (size_t i = 0; i <= boundaries_.size(); i++) { - Slice* start = i == 0 ? nullptr : &boundaries_[i - 1]; - Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i]; - compact_->sub_compact_states.emplace_back(c, start, end, - static_cast(i)); + compact_->sub_compact_states.emplace_back( + c, (i != 0) ? std::optional(boundaries_[i - 1]) : std::nullopt, + (i != boundaries_.size()) ? std::optional(boundaries_[i]) + : std::nullopt, + static_cast(i)); } RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, compact_->sub_compact_states.size()); } else { - constexpr Slice* start = nullptr; - constexpr Slice* end = nullptr; - - compact_->sub_compact_states.emplace_back(c, start, end, /*sub_job_id*/ 0); + compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt, + /*sub_job_id*/ 0); } if (c->immutable_options()->preclude_last_level_data_seconds > 0) { @@ -299,15 +300,48 @@ struct RangeWithSize { }; void CompactionJob::GenSubcompactionBoundaries() { + // The goal is to find some boundary keys so that we can evenly partition + // the compaction input data into max_subcompactions ranges. + // For every input file, we ask TableReader to estimate 128 anchor points + // that evenly partition the input file into 128 ranges and the range + // sizes. This can be calculated by scanning index blocks of the file. + // Once we have the anchor points for all the input files, we merge them + // together and try to find keys dividing ranges evenly. + // For example, if we have two input files, and each returns following + // ranges: + // File1: (a1, 1000), (b1, 1200), (c1, 1100) + // File2: (a2, 1100), (b2, 1000), (c2, 1000) + // We total sort the keys to following: + // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000) + // We calculate the total size by adding up all ranges' size, which is 6400. + // If we would like to partition into 2 subcompactions, the target of the + // range size is 3200. Based on the size, we take "b1" as the partition key + // since the first three ranges would hit 3200. + // + // Note that the ranges are actually overlapping. For example, in the example + // above, the range ending with "b1" is overlapping with the range ending with + // "b2". So the size 1000+1100+1200 is an underestimation of data size up to + // "b1". In extreme cases where we only compact N L0 files, a range can + // overlap with N-1 other ranges. Since we requested a relatively large number + // (128) of ranges from each input files, even N range overlapping would + // cause relatively small inaccuracy. + auto* c = compact_->compaction; + if (c->max_subcompactions() <= 1) { + return; + } auto* cfd = c->column_family_data(); const Comparator* cfd_comparator = cfd->user_comparator(); - std::vector bounds; + const InternalKeyComparator& icomp = cfd->internal_comparator(); + + auto* v = compact_->compaction->input_version(); + int base_level = v->storage_info()->base_level(); + InstrumentedMutexUnlock unlock_guard(db_mutex_); + + uint64_t total_size = 0; + std::vector all_anchors; int start_lvl = c->start_level(); int out_lvl = c->output_level(); - - // Add the starting and/or ending key of certain input files as a potential - // boundary for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { int lvl = c->level(lvl_idx); if (lvl >= start_lvl && lvl <= out_lvl) { @@ -318,106 +352,57 @@ void CompactionJob::GenSubcompactionBoundaries() { continue; } - if (lvl == 0) { - // For level 0 add the starting and ending key of each file since the - // files may have greatly differing key ranges (not range-partitioned) - for (size_t i = 0; i < num_files; i++) { - bounds.emplace_back(flevel->files[i].smallest_key); - bounds.emplace_back(flevel->files[i].largest_key); + for (size_t i = 0; i < num_files; i++) { + FileMetaData* f = flevel->files[i].file_metadata; + std::vector my_anchors; + Status s = cfd->table_cache()->ApproximateKeyAnchors( + ReadOptions(), icomp, f->fd, my_anchors); + if (!s.ok() || my_anchors.empty()) { + my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize()); } - } else { - // For all other levels add the smallest/largest key in the level to - // encompass the range covered by that level - bounds.emplace_back(flevel->files[0].smallest_key); - bounds.emplace_back(flevel->files[num_files - 1].largest_key); - if (lvl == out_lvl) { - // For the last level include the starting keys of all files since - // the last level is the largest and probably has the widest key - // range. Since it's range partitioned, the ending key of one file - // and the starting key of the next are very close (or identical). - for (size_t i = 1; i < num_files; i++) { - bounds.emplace_back(flevel->files[i].smallest_key); - } + for (auto& ac : my_anchors) { + // Can be optimize to avoid this loop. + total_size += ac.range_size; } - } - } - } - - std::sort(bounds.begin(), bounds.end(), - [cfd_comparator](const Slice& a, const Slice& b) -> bool { - return cfd_comparator->Compare(ExtractUserKey(a), - ExtractUserKey(b)) < 0; - }); - // Remove duplicated entries from bounds - bounds.erase( - std::unique(bounds.begin(), bounds.end(), - [cfd_comparator](const Slice& a, const Slice& b) -> bool { - return cfd_comparator->Compare(ExtractUserKey(a), - ExtractUserKey(b)) == 0; - }), - bounds.end()); - - // Combine consecutive pairs of boundaries into ranges with an approximate - // size of data covered by keys in that range - uint64_t sum = 0; - std::vector ranges; - // Get input version from CompactionState since it's already referenced - // earlier in SetInputVersioCompaction::SetInputVersion and will not change - // when db_mutex_ is released below - auto* v = compact_->compaction->input_version(); - for (auto it = bounds.begin();;) { - const Slice a = *it; - ++it; - if (it == bounds.end()) { - break; + all_anchors.insert(all_anchors.end(), my_anchors.begin(), + my_anchors.end()); + } } - - const Slice b = *it; - - // ApproximateSize could potentially create table reader iterator to seek - // to the index block and may incur I/O cost in the process. Unlock db - // mutex to reduce contention - db_mutex_->Unlock(); - uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a, - b, start_lvl, out_lvl + 1, - TableReaderCaller::kCompaction); - db_mutex_->Lock(); - ranges.emplace_back(a, b, size); - sum += size; } + // Here we total sort all the anchor points across all files and go through + // them in the sorted order to find partitioning boundaries. + // Not the most efficient implementation. A much more efficient algorithm + // probably exists. But they are more complex. If performance turns out to + // be a problem, we can optimize. + std::sort( + all_anchors.begin(), all_anchors.end(), + [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool { + return cfd_comparator->Compare(a.user_key, b.user_key) < 0; + }); // Group the ranges into subcompactions - const double min_file_fill_percent = 4.0 / 5; - int base_level = v->storage_info()->base_level(); - uint64_t max_output_files = static_cast(std::ceil( - sum / min_file_fill_percent / + uint64_t target_range_size = std::max( + total_size / static_cast(c->max_subcompactions()), MaxFileSizeForLevel( *(c->mutable_cf_options()), out_lvl, c->immutable_options()->compaction_style, base_level, - c->immutable_options()->level_compaction_dynamic_level_bytes))); - uint64_t subcompactions = - std::min({static_cast(ranges.size()), - static_cast(c->max_subcompactions()), - max_output_files}); - - if (subcompactions > 1) { - double mean = sum * 1.0 / subcompactions; - // Greedily add ranges to the subcompaction until the sum of the ranges' - // sizes becomes >= the expected mean size of a subcompaction - sum = 0; - for (size_t i = 0; i + 1 < ranges.size(); i++) { - sum += ranges[i].size; - if (subcompactions == 1) { - // If there's only one left to schedule then it goes to the end so no - // need to put an end boundary - continue; - } - if (sum >= mean) { - boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); - subcompactions--; - sum = 0; - } + c->immutable_options()->level_compaction_dynamic_level_bytes)); + + if (target_range_size >= total_size) { + return; + } + + uint64_t next_threshold = target_range_size; + uint64_t cumulative_size = 0; + for (TableReader::Anchor& anchor : all_anchors) { + cumulative_size += anchor.range_size; + if (cumulative_size > next_threshold) { + next_threshold += target_range_size; + boundaries_.push_back(anchor.user_key); + } + if (boundaries_.size() + 1 >= uint64_t{c->max_subcompactions()}) { + break; } } } @@ -885,8 +870,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // TODO: since we already use C++17, should use // std::optional instead. - const Slice* const start = sub_compact->start; - const Slice* const end = sub_compact->end; + const std::optional start = sub_compact->start; + const std::optional end = sub_compact->end; ReadOptions read_options; read_options.verify_checksums = true; @@ -900,19 +885,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // Note: if we're going to support subcompactions for user-defined timestamps, // the timestamp part will have to be stripped from the bounds here. - assert((!start && !end) || cfd->user_comparator()->timestamp_size() == 0); - read_options.iterate_lower_bound = start; - read_options.iterate_upper_bound = end; + assert((!start.has_value() && !end.has_value()) || + cfd->user_comparator()->timestamp_size() == 0); + if (start.has_value()) { + read_options.iterate_lower_bound = &start.value(); + } + if (end.has_value()) { + read_options.iterate_upper_bound = &end.value(); + } // Although the v2 aggregator is what the level iterator(s) know about, // the AddTombstones calls will be propagated down to the v1 aggregator. std::unique_ptr raw_input(versions_->MakeInputIterator( read_options, sub_compact->compaction, range_del_agg.get(), - file_options_for_read_, - (start == nullptr) ? std::optional{} - : std::optional{*start}, - (end == nullptr) ? std::optional{} - : std::optional{*end})); + file_options_for_read_, start, end)); InternalIterator* input = raw_input.get(); IterKey start_ikey; @@ -920,20 +906,21 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { Slice start_slice; Slice end_slice; - if (start) { - start_ikey.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); + if (start.has_value()) { + start_ikey.SetInternalKey(start.value(), kMaxSequenceNumber, + kValueTypeForSeek); start_slice = start_ikey.GetInternalKey(); } - if (end) { - end_ikey.SetInternalKey(*end, kMaxSequenceNumber, kValueTypeForSeek); + if (end.has_value()) { + end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek); end_slice = end_ikey.GetInternalKey(); } std::unique_ptr clip; - if (start || end) { + if (start.has_value() || end.has_value()) { clip = std::make_unique( - raw_input.get(), start ? &start_slice : nullptr, - end ? &end_slice : nullptr, &cfd->internal_comparator()); + raw_input.get(), start.has_value() ? &start_slice : nullptr, + end.has_value() ? &end_slice : nullptr, &cfd->internal_comparator()); input = clip.get(); } @@ -1061,8 +1048,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. - assert(!end || - cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); + assert(!end.has_value() || cfd->user_comparator()->Compare( + c_iter->user_key(), end.value()) < 0); if (c_iter_stats.num_input_records % kRecordStatsEvery == kRecordStatsEvery - 1) { @@ -1280,10 +1267,12 @@ Status CompactionJob::FinishCompactionOutputFile( // output_to_penultimate_level compaction here, as it's only used to decide // if range dels could be dropped. if (outputs.HasRangeDel()) { - s = outputs.AddRangeDels(sub_compact->start, sub_compact->end, - range_del_out_stats, bottommost_level_, - cfd->internal_comparator(), earliest_snapshot, - next_table_min_key); + s = outputs.AddRangeDels( + sub_compact->start.has_value() ? &(sub_compact->start.value()) + : nullptr, + sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr, + range_del_out_stats, bottommost_level_, cfd->internal_comparator(), + earliest_snapshot, next_table_min_key); } RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); @@ -1595,16 +1584,16 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, } uint64_t current_time = static_cast(temp_current_time); InternalKey tmp_start, tmp_end; - if (sub_compact->start != nullptr) { - tmp_start.SetMinPossibleForUserKey(*(sub_compact->start)); + if (sub_compact->start.has_value()) { + tmp_start.SetMinPossibleForUserKey(sub_compact->start.value()); } - if (sub_compact->end != nullptr) { - tmp_end.SetMinPossibleForUserKey(*(sub_compact->end)); + if (sub_compact->end.has_value()) { + tmp_end.SetMinPossibleForUserKey(sub_compact->end.value()); } uint64_t oldest_ancester_time = sub_compact->compaction->MinInputFileOldestAncesterTime( - (sub_compact->start != nullptr) ? &tmp_start : nullptr, - (sub_compact->end != nullptr) ? &tmp_end : nullptr); + sub_compact->start.has_value() ? &tmp_start : nullptr, + sub_compact->end.has_value() ? &tmp_end : nullptr); if (oldest_ancester_time == std::numeric_limits::max()) { oldest_ancester_time = current_time; } diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 2a342bddf..cbe913540 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -292,7 +292,7 @@ class CompactionJob { bool paranoid_file_checks_; bool measure_io_stats_; // Stores the Slices that designate the boundaries for each subcompaction - std::vector boundaries_; + std::vector boundaries_; Env::Priority thread_pri_; std::string full_history_ts_low_; std::string trim_ts_; diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index eeb936878..1d2e99d99 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -47,10 +47,10 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.db_options = BuildDBOptions(db_options_, mutable_db_options_copy_); compaction_input.snapshots = existing_snapshots_; - compaction_input.has_begin = sub_compact->start; + compaction_input.has_begin = sub_compact->start.has_value(); compaction_input.begin = compaction_input.has_begin ? sub_compact->start->ToString() : ""; - compaction_input.has_end = sub_compact->end; + compaction_input.has_end = sub_compact->end.has_value(); compaction_input.end = compaction_input.has_end ? sub_compact->end->ToString() : ""; @@ -264,8 +264,12 @@ Status CompactionServiceCompactionJob::Run() { Slice begin = compaction_input_.begin; Slice end = compaction_input_.end; compact_->sub_compact_states.emplace_back( - c, compaction_input_.has_begin ? &begin : nullptr, - compaction_input_.has_end ? &end : nullptr, /*sub_job_id*/ 0); + c, + compaction_input_.has_begin ? std::optional(begin) + : std::optional(), + compaction_input_.has_end ? std::optional(end) + : std::optional(), + /*sub_job_id*/ 0); log_buffer_->FlushBufferToLog(); LogCompaction(); diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 6774ffd15..4570662d8 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -10,6 +10,8 @@ #pragma once +#include + #include "db/blob/blob_file_addition.h" #include "db/blob/blob_garbage_meter.h" #include "db/compaction/compaction.h" @@ -52,7 +54,7 @@ class SubcompactionState { // The boundaries of the key-range this compaction is interested in. No two // sub-compactions may have overlapping key-ranges. // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded - const Slice *start, *end; + const std::optional start, end; // The return status of this sub-compaction Status status; @@ -117,8 +119,8 @@ class SubcompactionState { SubcompactionState(const SubcompactionState&) = delete; SubcompactionState& operator=(const SubcompactionState&) = delete; - SubcompactionState(Compaction* c, Slice* _start, Slice* _end, - uint32_t _sub_job_id) + SubcompactionState(Compaction* c, const std::optional _start, + const std::optional _end, uint32_t _sub_job_id) : compaction(c), start(_start), end(_end), @@ -132,12 +134,12 @@ class SubcompactionState { // Invalid output_split_key indicates that we do not need to split if (output_split_key != nullptr) { // We may only split the output when the cursor is in the range. Split - if ((end == nullptr || icmp->user_comparator()->Compare( - ExtractUserKey(output_split_key->Encode()), - ExtractUserKey(*end)) < 0) && - (start == nullptr || icmp->user_comparator()->Compare( + if ((!end.has_value() || icmp->user_comparator()->Compare( ExtractUserKey(output_split_key->Encode()), - ExtractUserKey(*start)) > 0)) { + ExtractUserKey(end.value())) < 0) && + (!start.has_value() || icmp->user_comparator()->Compare( + ExtractUserKey(output_split_key->Encode()), + ExtractUserKey(start.value())) > 0)) { local_output_split_key_ = output_split_key; } } diff --git a/db/table_cache.cc b/db/table_cache.cc index e9ef7acf1..3356d3b19 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -527,6 +527,27 @@ Status TableCache::GetTableProperties( return s; } +Status TableCache::ApproximateKeyAnchors( + const ReadOptions& ro, const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, std::vector& anchors) { + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + if (t == nullptr) { + s = FindTable(ro, file_options_, internal_comparator, fd, &handle); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + } + } + if (s.ok() && t != nullptr) { + s = t->ApproximateKeyAnchors(ro, anchors); + } + if (handle != nullptr) { + ReleaseHandle(handle); + } + return s; +} + size_t TableCache::GetMemoryUsageByTableReader( const FileOptions& file_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, diff --git a/db/table_cache.h b/db/table_cache.h index dc5c7a21e..d7a05200f 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -165,6 +165,11 @@ class TableCache { const std::shared_ptr& prefix_extractor = nullptr, bool no_io = false); + Status ApproximateKeyAnchors(const ReadOptions& ro, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_meta, + std::vector& anchors); + // Return total memory usage of the table reader of the file. // 0 if table reader of the file is not loaded. size_t GetMemoryUsageByTableReader( diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index a67ca5906..b4640245a 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -2045,6 +2045,57 @@ void BlockBasedTable::FullFilterKeysMayMatch( } } +Status BlockBasedTable::ApproximateKeyAnchors(const ReadOptions& read_options, + std::vector& anchors) { + // We iterator the whole index block here. More efficient implementation + // is possible if we push this operation into IndexReader. For example, we + // can directly sample from restart block entries in the index block and + // only read keys needed. Here we take a simple solution. Performance is + // likely not to be a problem. We are compacting the whole file, so all + // keys will be read out anyway. An extra read to index block might be + // a small share of the overhead. We can try to optimize if needed. + IndexBlockIter iiter_on_stack; + auto iiter = NewIndexIterator( + read_options, /*disable_prefix_seek=*/false, &iiter_on_stack, + /*get_context=*/nullptr, /*lookup_context=*/nullptr); + std::unique_ptr> iiter_unique_ptr; + if (iiter != &iiter_on_stack) { + iiter_unique_ptr.reset(iiter); + } + + // If needed the threshold could be more adaptive. For example, it can be + // based on size, so that a larger will be sampled to more partitions than a + // smaller file. The size might also need to be passed in by the caller based + // on total compaction size. + const uint64_t kMaxNumAnchors = uint64_t{128}; + uint64_t num_blocks = this->GetTableProperties()->num_data_blocks; + uint64_t num_blocks_per_anchor = num_blocks / kMaxNumAnchors; + if (num_blocks_per_anchor == 0) { + num_blocks_per_anchor = 1; + } + + uint64_t count = 0; + std::string last_key; + uint64_t range_size = 0; + uint64_t prev_offset = 0; + for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) { + const BlockHandle& bh = iiter->value().handle; + range_size += bh.offset() + bh.size() - prev_offset; + prev_offset = bh.offset() + bh.size(); + if (++count % num_blocks_per_anchor == 0) { + count = 0; + anchors.emplace_back(iiter->user_key(), range_size); + range_size = 0; + } else { + last_key = iiter->user_key().ToString(); + } + } + if (count != 0) { + anchors.emplace_back(last_key, range_size); + } + return Status::OK(); +} + Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, GetContext* get_context, const SliceTransform* prefix_extractor, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index c232446b6..dc362b2f3 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -168,6 +168,9 @@ class BlockBasedTable : public TableReader { uint64_t ApproximateSize(const Slice& start, const Slice& end, TableReaderCaller caller) override; + Status ApproximateKeyAnchors(const ReadOptions& read_options, + std::vector& anchors) override; + bool TEST_BlockInCache(const BlockHandle& handle) const; // Returns true if the block for the specified key is in cache. diff --git a/table/table_reader.h b/table/table_reader.h index c1d98c143..4b4902f12 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -86,6 +86,20 @@ class TableReader { virtual uint64_t ApproximateSize(const Slice& start, const Slice& end, TableReaderCaller caller) = 0; + struct Anchor { + Anchor(const Slice& _user_key, size_t _range_size) + : user_key(_user_key.ToStringView()), range_size(_range_size) {} + std::string user_key; + size_t range_size; + }; + + // Now try to return approximately 128 anchor keys. + // The last one tends to be the largest key. + virtual Status ApproximateKeyAnchors(const ReadOptions& /*read_options*/, + std::vector& /*anchors*/) { + return Status::NotSupported("ApproximateKeyAnchors() not supported."); + } + // Set up the table for Compaction. Might change some parameters with // posix_fadvise virtual void SetupForCompaction() = 0; diff --git a/table/table_test.cc b/table/table_test.cc index 7e60a06b0..fc45ce347 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -41,6 +41,7 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/iterator.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/options.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" @@ -4047,6 +4048,57 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) { } } +TEST_F(GeneralTableTest, ApproximateKeyAnchors) { + Random rnd(301); + TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */); + std::string tmp; + for (int i = 1000; i < 9000; i++) { + c.Add(std::to_string(i), rnd.RandomString(2000)); + } + std::vector keys; + stl_wrappers::KVMap kvmap; + Options options; + InternalKeyComparator ikc(options.comparator); + options.compression = kNoCompression; + BlockBasedTableOptions table_options; + table_options.block_size = 4096; + const ImmutableOptions ioptions(options); + const MutableCFOptions moptions(options); + c.Finish(options, ioptions, moptions, table_options, ikc, &keys, &kvmap); + + std::vector anchors; + ASSERT_OK(c.GetTableReader()->ApproximateKeyAnchors(ReadOptions(), anchors)); + // The target is 128 anchors. But in reality it can be slightly more or fewer. + ASSERT_GT(anchors.size(), 120); + ASSERT_LT(anchors.size(), 140); + + // We have around 8000 keys. With 128 anchors, in average 62.5 keys per + // anchor. Here we take a rough range and estimate the distance between + // anchors is between 50 and 100. + // Total data size is about 18,000,000, so each anchor range is about + // 140,625. We also take a rough range. + int prev_num = 1000; + // Non-last anchor + for (size_t i = 0; i + 1 < anchors.size(); i++) { + auto& anchor = anchors[i]; + ASSERT_GT(anchor.range_size, 100000); + ASSERT_LT(anchor.range_size, 200000); + + // Key might be shortened, so fill 0 in the end if it is the case. + std::string key_cpy = anchor.user_key; + key_cpy.append(4 - key_cpy.size(), '0'); + int num = std::stoi(key_cpy); + ASSERT_GT(num - prev_num, 50); + ASSERT_LT(num - prev_num, 100); + prev_num = num; + } + + ASSERT_EQ("8999", anchors.back().user_key); + ASSERT_LT(anchors.back().range_size, 200000); + + c.ResetTableReader(); +} + #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) TEST_P(ParameterizedHarnessTest, RandomizedHarnessTest) { Random rnd(test::RandomSeed() + 5);