Tiered Compaction: per key placement support (#9964)

Summary:
Support per_key_placement for last level compaction, which will
be used for tiered compaction.
* compaction iterator reports which level a key should output to;
* compaction get the output level information and check if it's safe to
  output the data to penultimate level;
* all compaction output files will be installed.
* extra internal compaction stats added for penultimate level.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9964

Test Plan:
* Unittest
* db_bench, no significate difference: https://gist.github.com/jay-zhuang/3645f8fb97ec0ab47c10704bb39fd6e4
* microbench manual compaction no significate difference: https://gist.github.com/jay-zhuang/ba679b3e89e24992615ee9eef310e6dd
* run the db_stress multiple times (not covering the new feature) looks good (internal: https://fburl.com/sandcastle/9w84pp2m)

Reviewed By: ajkr

Differential Revision: D36249494

Pulled By: jay-zhuang

fbshipit-source-id: a96da57c8031c1df83e4a7a8567b657a112b80a3
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent 7e1b417824
commit 6ce0b2ca34
  1. 5
      CMakeLists.txt
  2. 3
      Makefile
  3. 14
      TARGETS
  4. 83
      db/compaction/compaction.cc
  5. 70
      db/compaction/compaction.h
  6. 53
      db/compaction/compaction_iterator.cc
  7. 29
      db/compaction/compaction_iterator.h
  8. 126
      db/compaction/compaction_iterator_test.cc
  9. 1781
      db/compaction/compaction_job.cc
  10. 119
      db/compaction/compaction_job.h
  11. 88
      db/compaction/compaction_job_test.cc
  12. 314
      db/compaction/compaction_outputs.cc
  13. 328
      db/compaction/compaction_outputs.h
  14. 28
      db/compaction/compaction_picker.cc
  15. 3
      db/compaction/compaction_picker.h
  16. 189
      db/compaction/compaction_picker_test.cc
  17. 825
      db/compaction/compaction_service_job.cc
  18. 46
      db/compaction/compaction_state.cc
  19. 42
      db/compaction/compaction_state.h
  20. 223
      db/compaction/subcompaction_state.cc
  21. 255
      db/compaction/subcompaction_state.h
  22. 1253
      db/compaction/tiered_compaction_test.cc
  23. 165
      db/db_compaction_test.cc
  24. 158
      db/internal_stats.h
  25. 2
      include/rocksdb/compaction_job_stats.h
  26. 5
      src.mk

@ -627,7 +627,11 @@ set(SOURCES
db/compaction/compaction_picker_fifo.cc
db/compaction/compaction_picker_level.cc
db/compaction/compaction_picker_universal.cc
db/compaction/compaction_service_job.cc
db/compaction/compaction_state.cc
db/compaction/compaction_outputs.cc
db/compaction/sst_partitioner.cc
db/compaction/subcompaction_state.cc
db/convenience.cc
db/db_filesnapshot.cc
db/db_impl/compacted_db_impl.cc
@ -1231,6 +1235,7 @@ if(WITH_TESTS)
db/compaction/compaction_iterator_test.cc
db/compaction/compaction_picker_test.cc
db/compaction/compaction_service_test.cc
db/compaction/tiered_compaction_test.cc
db/comparator_db_test.cc
db/corruption_test.cc
db/cuckoo_table_db_test.cc

@ -1783,6 +1783,9 @@ write_unprepared_transaction_test: $(OBJ_DIR)/utilities/transactions/write_unpre
timestamped_snapshot_test: $(OBJ_DIR)/utilities/transactions/timestamped_snapshot_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
tiered_compaction_test: $(OBJ_DIR)/db/compaction/tiered_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
sst_dump: $(OBJ_DIR)/tools/sst_dump.o $(TOOLS_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -38,11 +38,15 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/compaction/compaction.cc",
"db/compaction/compaction_iterator.cc",
"db/compaction/compaction_job.cc",
"db/compaction/compaction_outputs.cc",
"db/compaction/compaction_picker.cc",
"db/compaction/compaction_picker_fifo.cc",
"db/compaction/compaction_picker_level.cc",
"db/compaction/compaction_picker_universal.cc",
"db/compaction/compaction_service_job.cc",
"db/compaction/compaction_state.cc",
"db/compaction/sst_partitioner.cc",
"db/compaction/subcompaction_state.cc",
"db/convenience.cc",
"db/db_filesnapshot.cc",
"db/db_impl/compacted_db_impl.cc",
@ -368,11 +372,15 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/compaction/compaction.cc",
"db/compaction/compaction_iterator.cc",
"db/compaction/compaction_job.cc",
"db/compaction/compaction_outputs.cc",
"db/compaction/compaction_picker.cc",
"db/compaction/compaction_picker_fifo.cc",
"db/compaction/compaction_picker_level.cc",
"db/compaction/compaction_picker_universal.cc",
"db/compaction/compaction_service_job.cc",
"db/compaction/compaction_state.cc",
"db/compaction/sst_partitioner.cc",
"db/compaction/subcompaction_state.cc",
"db/convenience.cc",
"db/db_filesnapshot.cc",
"db/db_impl/compacted_db_impl.cc",
@ -5764,6 +5772,12 @@ cpp_unittest_wrapper(name="thread_local_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="tiered_compaction_test",
srcs=["db/compaction/tiered_compaction_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="timer_queue_test",
srcs=["util/timer_queue_test.cc"],
deps=[":rocksdb_test_lib"],

@ -77,11 +77,11 @@ void Compaction::SetInputVersion(Version* _input_version) {
void Compaction::GetBoundaryKeys(
VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs, Slice* smallest_user_key,
Slice* largest_user_key) {
Slice* largest_user_key, int exclude_level) {
bool initialized = false;
const Comparator* ucmp = vstorage->InternalComparator()->user_comparator();
for (size_t i = 0; i < inputs.size(); ++i) {
if (inputs[i].files.empty()) {
if (inputs[i].files.empty() || inputs[i].level == exclude_level) {
continue;
}
if (inputs[i].level == 0) {
@ -257,7 +257,9 @@ Compaction::Compaction(
_blob_garbage_collection_age_cutoff < 0 ||
_blob_garbage_collection_age_cutoff > 1
? mutable_cf_options()->blob_garbage_collection_age_cutoff
: _blob_garbage_collection_age_cutoff) {
: _blob_garbage_collection_age_cutoff),
penultimate_level_(EvaluatePenultimateLevel(
immutable_options_, start_level_, output_level_)) {
MarkFilesBeingCompacted(true);
if (is_manual_compaction_) {
compaction_reason_ = CompactionReason::kManualCompaction;
@ -303,6 +305,18 @@ Compaction::Compaction(
}
}
}
PopulatePenultimateLevelOutputRange();
}
void Compaction::PopulatePenultimateLevelOutputRange() {
if (!SupportsPerKeyPlacement()) {
return;
}
GetBoundaryKeys(input_vstorage_, inputs_,
&penultimate_level_smallest_user_key_,
&penultimate_level_largest_user_key_, number_levels_ - 1);
}
Compaction::~Compaction() {
@ -314,6 +328,37 @@ Compaction::~Compaction() {
}
}
bool Compaction::SupportsPerKeyPlacement() const {
return penultimate_level_ != kInvalidLevel;
}
int Compaction::GetPenultimateLevel() const { return penultimate_level_; }
bool Compaction::OverlapPenultimateLevelOutputRange(
const Slice& smallest_key, const Slice& largest_key) const {
if (!SupportsPerKeyPlacement()) {
return false;
}
const Comparator* ucmp =
input_vstorage_->InternalComparator()->user_comparator();
return ucmp->Compare(smallest_key, penultimate_level_largest_user_key_) <=
0 &&
ucmp->Compare(largest_key, penultimate_level_smallest_user_key_) >= 0;
}
bool Compaction::WithinPenultimateLevelOutputRange(const Slice& key) const {
if (!SupportsPerKeyPlacement()) {
return false;
}
const Comparator* ucmp =
input_vstorage_->InternalComparator()->user_comparator();
return ucmp->Compare(key, penultimate_level_smallest_user_key_) >= 0 &&
ucmp->Compare(key, penultimate_level_largest_user_key_) <= 0;
}
bool Compaction::InputCompressionMatchesOutput() const {
int base_level = input_vstorage_->base_level();
bool matches =
@ -677,8 +722,36 @@ uint64_t Compaction::MinInputFileOldestAncesterTime(
return min_oldest_ancester_time;
}
int Compaction::GetInputBaseLevel() const {
return input_vstorage_->base_level();
int Compaction::EvaluatePenultimateLevel(
const ImmutableOptions& immutable_options, const int start_level,
const int output_level) {
// TODO: currently per_key_placement feature only support level and universal
// compaction
if (immutable_options.compaction_style != kCompactionStyleLevel &&
immutable_options.compaction_style != kCompactionStyleUniversal) {
return kInvalidLevel;
}
if (output_level != immutable_options.num_levels - 1) {
return kInvalidLevel;
}
int penultimate_level = output_level - 1;
assert(penultimate_level < immutable_options.num_levels);
if (penultimate_level <= 0 || penultimate_level < start_level) {
return kInvalidLevel;
}
// TODO: will add public like `options.preclude_last_level_data_seconds` for
// per_key_placement feature, will check that option here. Currently, only
// set by unittest
bool supports_per_key_placement = false;
TEST_SYNC_POINT_CALLBACK("Compaction::SupportsPerKeyPlacement:Enabled",
&supports_per_key_placement);
if (!supports_per_key_placement) {
return kInvalidLevel;
}
return penultimate_level;
}
} // namespace ROCKSDB_NAMESPACE

@ -302,7 +302,25 @@ class Compaction {
Slice GetLargestUserKey() const { return largest_user_key_; }
int GetInputBaseLevel() const;
// Return true if the compaction supports per_key_placement
bool SupportsPerKeyPlacement() const;
// Get per_key_placement penultimate output level, which is `last_level - 1`
// if per_key_placement feature is supported. Otherwise, return -1.
int GetPenultimateLevel() const;
// Return true if the given range is overlap with penultimate level output
// range.
bool OverlapPenultimateLevelOutputRange(const Slice& smallest_key,
const Slice& largest_key) const;
// Return true if the key is within penultimate level output range for
// per_key_placement feature, which is safe to place the key to the
// penultimate level. different compaction strategy has different rules.
// If per_key_placement is not supported, always return false.
// TODO: currently it doesn't support moving data from the last level to the
// penultimate level
bool WithinPenultimateLevelOutputRange(const Slice& key) const;
CompactionReason compaction_reason() const { return compaction_reason_; }
@ -339,6 +357,15 @@ class Compaction {
return notify_on_compaction_completion_;
}
static constexpr int kInvalidLevel = -1;
// Evaluate penultimate output level. If the compaction supports
// per_key_placement feature, it returns the penultimate level number.
// Otherwise, it's set to kInvalidLevel (-1), which means
// output_to_penultimate_level is not supported.
static int EvaluatePenultimateLevel(const ImmutableOptions& immutable_options,
const int start_level,
const int output_level);
private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
@ -346,7 +373,18 @@ class Compaction {
// get the smallest and largest key present in files to be compacted
static void GetBoundaryKeys(VersionStorageInfo* vstorage,
const std::vector<CompactionInputFiles>& inputs,
Slice* smallest_key, Slice* largest_key);
Slice* smallest_key, Slice* largest_key,
int exclude_level = -1);
// populate penultimate level output range, which will be used to determine if
// a key is safe to output to the penultimate level (details see
// `Compaction::WithinPenultimateLevelOutputRange()`.
// TODO: Currently the penultimate level output range is the min/max keys of
// non-last-level input files. Which is only good if there's no key moved
// from the last level to the penultimate level. For a more complicated per
// key placement which may move data from the last level to the penultimate
// level, it needs extra check.
void PopulatePenultimateLevelOutputRange();
// Get the atomic file boundaries for all files in the compaction. Necessary
// in order to avoid the scenario described in
@ -444,7 +482,35 @@ class Compaction {
// Blob garbage collection age cutoff.
double blob_garbage_collection_age_cutoff_;
// only set when per_key_placement feature is enabled, -1 (kInvalidLevel)
// means not supported.
const int penultimate_level_;
// Key range for penultimate level output
Slice penultimate_level_smallest_user_key_;
Slice penultimate_level_largest_user_key_;
};
#ifndef NDEBUG
// Helper struct only for tests, which contains the data to decide if a key
// should be output to the penultimate level.
// TODO: remove this when the public feature knob is available
struct PerKeyPlacementContext {
const int level;
const Slice key;
const Slice value;
const SequenceNumber seq_num;
bool output_to_penultimate_level;
PerKeyPlacementContext(int _level, Slice _key, Slice _value,
SequenceNumber _seq_num)
: level(_level), key(_key), value(_value), seq_num(_seq_num) {
output_to_penultimate_level = false;
}
};
#endif /* !NDEBUG */
// Return sum of sizes of all files in `files`.
extern uint64_t TotalFileSize(const std::vector<FileMetaData*>& files);

@ -1075,6 +1075,52 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
}
}
void CompactionIterator::DecideOutputLevel() {
#ifndef NDEBUG
// TODO: will be set by sequence number or key range, for now, it will only be
// set by unittest
PerKeyPlacementContext context(level_, ikey_.user_key, value_,
ikey_.sequence);
TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
&context);
output_to_penultimate_level_ = context.output_to_penultimate_level;
#endif /* !NDEBUG */
// if the key is within the earliest snapshot, it has to output to the
// penultimate level.
if (ikey_.sequence > earliest_snapshot_) {
output_to_penultimate_level_ = true;
}
if (output_to_penultimate_level_) {
// If it's decided to output to the penultimate level, but unsafe to do so,
// still output to the last level. For example, moving the data from a lower
// level to a higher level outside of the higher-level input key range is
// considered unsafe, because the key may conflict with higher-level SSTs
// not from this compaction.
// TODO: add statistic for declined output_to_penultimate_level
bool safe_to_penultimate_level =
compaction_->WithinPenultimateLevelOutputRange(ikey_.user_key);
if (!safe_to_penultimate_level) {
output_to_penultimate_level_ = false;
// It could happen when disable/enable `bottommost_temperature` while
// holding a snapshot. When `bottommost_temperature` is not set
// (==kUnknown), the data newer than any snapshot is pushed to the last
// level, but when the per_key_placement feature is enabled on the fly,
// the data later than the snapshot has to be moved to the penultimate
// level, which may or may not be safe. So the user needs to make sure all
// snapshot is released before enabling `bottommost_temperature` feature
// We will migrate the feature to `last_level_temperature` and maybe make
// it not dynamically changeable.
if (ikey_.sequence > earliest_snapshot_) {
status_ = Status::Corruption(
"Unsafe to store Seq later than snapshot in the last level if "
"per_key_placement is enabled");
}
}
}
}
void CompactionIterator::PrepareOutput() {
if (valid_) {
if (ikey_.type == kTypeValue) {
@ -1083,6 +1129,10 @@ void CompactionIterator::PrepareOutput() {
GarbageCollectBlobIfNeeded();
}
if (compaction_ != nullptr && compaction_->SupportsPerKeyPlacement()) {
DecideOutputLevel();
}
// Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
@ -1097,7 +1147,8 @@ void CompactionIterator::PrepareOutput() {
if (valid_ && compaction_ != nullptr &&
!compaction_->allow_ingest_behind() && bottommost_level_ &&
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_) {
ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_) {
if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL(

@ -105,6 +105,10 @@ class CompactionIterator {
virtual bool DoesInputReferenceBlobFiles() const = 0;
virtual const Compaction* real_compaction() const = 0;
virtual bool SupportsPerKeyPlacement() const = 0;
virtual bool WithinPenultimateLevelOutputRange(const Slice& key) const = 0;
};
class RealCompaction : public CompactionProxy {
@ -163,6 +167,16 @@ class CompactionIterator {
const Compaction* real_compaction() const override { return compaction_; }
bool SupportsPerKeyPlacement() const override {
return compaction_->SupportsPerKeyPlacement();
}
// Check if key is within penultimate level output range, to see if it's
// safe to output to the penultimate level for per_key_placement feature.
bool WithinPenultimateLevelOutputRange(const Slice& key) const override {
return compaction_->WithinPenultimateLevelOutputRange(key);
}
private:
const Compaction* compaction_;
};
@ -227,6 +241,12 @@ class CompactionIterator {
const Slice& user_key() const { return current_user_key_; }
const CompactionIterationStats& iter_stats() const { return iter_stats_; }
uint64_t num_input_entry_scanned() const { return input_.num_itered(); }
// If the current key should be placed on penultimate level, only valid if
// per_key_placement is supported
bool output_to_penultimate_level() const {
return output_to_penultimate_level_;
}
Status InputStatus() const { return input_.status(); }
private:
// Processes the input stream to find the next output
@ -235,6 +255,10 @@ class CompactionIterator {
// Do final preparations before presenting the output to the callee.
void PrepareOutput();
// Decide the current key should be output to the last level or penultimate
// level, only call for compaction supports per key placement
void DecideOutputLevel();
// Passes the output value to the blob file builder (if any), and replaces it
// with the corresponding blob reference if it has been actually written to a
// blob file (i.e. if it passed the value size check). Returns true if the
@ -417,6 +441,11 @@ class CompactionIterator {
// just been zeroed out during bottommost compaction.
bool last_key_seq_zeroed_{false};
// True if the current key should be output to the penultimate level if
// possible, compaction logic makes the final decision on which level to
// output to.
bool output_to_penultimate_level_{false};
void AdvanceInputIter() { input_.Next(); }
void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }

@ -180,11 +180,21 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
const Compaction* real_compaction() const override { return nullptr; }
bool SupportsPerKeyPlacement() const override {
return supports_per_key_placement;
}
bool WithinPenultimateLevelOutputRange(const Slice& key) const override {
return (!key.starts_with("unsafe_pb"));
}
bool key_not_exists_beyond_output_level = false;
bool is_bottommost_level = false;
bool is_allow_ingest_behind = false;
bool supports_per_key_placement = false;
};
// A simplified snapshot checker which assumes each snapshot has a global
@ -254,6 +264,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind();
compaction_proxy_->key_not_exists_beyond_output_level =
key_not_exists_beyond_output_level;
compaction_proxy_->supports_per_key_placement = SupportsPerKeyPlacement();
compaction.reset(compaction_proxy_);
}
bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
@ -295,6 +306,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
virtual bool AllowIngestBehind() const { return false; }
virtual bool SupportsPerKeyPlacement() const { return false; }
void RunTest(
const std::vector<std::string>& input_keys,
const std::vector<std::string>& input_values,
@ -756,6 +769,119 @@ TEST_P(CompactionIteratorTest, ConvertToPutAtBottom) {
INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
testing::Values(true, false));
class PerKeyPlacementCompIteratorTest : public CompactionIteratorTest {
public:
bool SupportsPerKeyPlacement() const override { return true; }
};
TEST_P(PerKeyPlacementCompIteratorTest, SplitLastLevelData) {
std::atomic_uint64_t latest_cold_seq = 0;
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
});
SyncPoint::GetInstance()->EnableProcessing();
latest_cold_seq = 5;
InitIterators(
{test::KeyStr("a", 7, kTypeValue), test::KeyStr("b", 6, kTypeValue),
test::KeyStr("c", 5, kTypeValue)},
{"vala", "valb", "valc"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
nullptr, nullptr, true);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
// the first 2 keys are hot, which should has
// `output_to_penultimate_level()==true` and seq num not zeroed out
ASSERT_EQ(test::KeyStr("a", 7, kTypeValue), c_iter_->key().ToString());
ASSERT_TRUE(c_iter_->output_to_penultimate_level());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("b", 6, kTypeValue), c_iter_->key().ToString());
ASSERT_TRUE(c_iter_->output_to_penultimate_level());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
// `a` is cold data, which should be output to bottommost
ASSERT_EQ(test::KeyStr("c", 0, kTypeValue), c_iter_->key().ToString());
ASSERT_FALSE(c_iter_->output_to_penultimate_level());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(PerKeyPlacementCompIteratorTest, SnapshotData) {
AddSnapshot(5);
InitIterators(
{test::KeyStr("a", 7, kTypeValue), test::KeyStr("b", 6, kTypeDeletion),
test::KeyStr("b", 5, kTypeValue)},
{"vala", "", "valb"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
nullptr, nullptr, true);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
// The first key and the tombstone are within snapshot, which should output
// to the penultimate level (and seq num cannot be zeroed out).
ASSERT_EQ(test::KeyStr("a", 7, kTypeValue), c_iter_->key().ToString());
ASSERT_TRUE(c_iter_->output_to_penultimate_level());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("b", 6, kTypeDeletion), c_iter_->key().ToString());
ASSERT_TRUE(c_iter_->output_to_penultimate_level());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
// `a` is not protected by the snapshot, the sequence number is zero out and
// should output bottommost
ASSERT_EQ(test::KeyStr("b", 0, kTypeValue), c_iter_->key().ToString());
ASSERT_FALSE(c_iter_->output_to_penultimate_level());
c_iter_->Next();
ASSERT_OK(c_iter_->status());
ASSERT_FALSE(c_iter_->Valid());
}
TEST_P(PerKeyPlacementCompIteratorTest, ConflictWithSnapshot) {
std::atomic_uint64_t latest_cold_seq = 0;
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
});
SyncPoint::GetInstance()->EnableProcessing();
latest_cold_seq = 6;
AddSnapshot(5);
InitIterators({test::KeyStr("a", 7, kTypeValue),
test::KeyStr("unsafe_pb", 6, kTypeValue),
test::KeyStr("c", 5, kTypeValue)},
{"vala", "valb", "valc"}, {}, {}, kMaxSequenceNumber,
kMaxSequenceNumber, nullptr, nullptr, true);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 7, kTypeValue), c_iter_->key().ToString());
ASSERT_TRUE(c_iter_->output_to_penultimate_level());
// the 2nd key is unsafe to output_to_penultimate_level, but it's within
// snapshot so for per_key_placement feature it has to be outputted to the
// penultimate level. which is a corruption. We should never see
// such case as the data with seq num (within snapshot) should always come
// from higher compaction input level, which makes it safe to
// output_to_penultimate_level.
c_iter_->Next();
ASSERT_TRUE(c_iter_->status().IsCorruption());
}
INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompIteratorTest,
PerKeyPlacementCompIteratorTest,
testing::Values(true, false));
// Tests how CompactionIterator work together with SnapshotChecker.
class CompactionIteratorWithSnapshotCheckerTest
: public CompactionIteratorTest {

File diff suppressed because it is too large Load Diff

@ -20,6 +20,7 @@
#include "db/blob/blob_file_completion_callback.h"
#include "db/column_family.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_outputs.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
#include "db/job_context.h"
@ -47,6 +48,7 @@
namespace ROCKSDB_NAMESPACE {
class Arena;
class CompactionState;
class ErrorHandler;
class MemTable;
class SnapshotChecker;
@ -56,11 +58,91 @@ class Version;
class VersionEdit;
class VersionSet;
class SubcompactionState;
// CompactionJob is responsible for executing the compaction. Each (manual or
// automated) compaction corresponds to a CompactionJob object, and usually
// goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
// will divide the compaction into subcompactions and execute them in parallel
// if needed.
//
// CompactionJob has 2 main stats:
// 1. CompactionJobStats compaction_job_stats_
// CompactionJobStats is a public data structure which is part of Compaction
// event listener that rocksdb share the job stats with the user.
// Internally it's an aggregation of all the compaction_job_stats from each
// `SubcompactionState`:
// +------------------------+
// | SubcompactionState |
// | |
// +--------->| compaction_job_stats |
// | | |
// | +------------------------+
// +------------------------+ |
// | CompactionJob | | +------------------------+
// | | | | SubcompactionState |
// | compaction_job_stats +-----+ | |
// | | +--------->| compaction_job_stats |
// | | | | |
// +------------------------+ | +------------------------+
// |
// | +------------------------+
// | | SubcompactionState |
// | | |
// +--------->+ compaction_job_stats |
// | | |
// | +------------------------+
// |
// | +------------------------+
// | | ... |
// +--------->+ |
// +------------------------+
//
// 2. CompactionStatsFull compaction_stats_
// `CompactionStatsFull` is an internal stats about the compaction, which
// is eventually sent to `ColumnFamilyData::internal_stats_` and used for
// logging and public metrics.
// Internally, it's an aggregation of stats_ from each `SubcompactionState`.
// It has 2 parts, normal stats about the main compaction information and
// the penultimate level output stats.
// `SubcompactionState` maintains the CompactionOutputs for normal output and
// the penultimate level output if exists, the per_level stats is
// stored with the outputs.
// +---------------------------+
// | SubcompactionState |
// | |
// | +----------------------+ |
// | | CompactionOutputs | |
// | | (normal output) | |
// +---->| stats_ | |
// | | +----------------------+ |
// | | |
// | | +----------------------+ |
// +--------------------------------+ | | | CompactionOutputs | |
// | CompactionJob | | | | (penultimate_level) | |
// | | +--------->| stats_ | |
// | compaction_stats_ | | | | +----------------------+ |
// | +-------------------------+ | | | | |
// | |stats (normal) |------|----+ +---------------------------+
// | +-------------------------+ | | |
// | | | |
// | +-------------------------+ | | | +---------------------------+
// | |penultimate_level_stats +------+ | | SubcompactionState |
// | +-------------------------+ | | | | |
// | | | | | +----------------------+ |
// | | | | | | CompactionOutputs | |
// +--------------------------------+ | | | | (normal output) | |
// | +---->| stats_ | |
// | | +----------------------+ |
// | | |
// | | +----------------------+ |
// | | | CompactionOutputs | |
// | | | (penultimate_level) | |
// +--------->| stats_ | |
// | +----------------------+ |
// | |
// +---------------------------+
class CompactionJob {
public:
CompactionJob(
@ -107,11 +189,6 @@ class CompactionJob {
IOStatus io_status() const { return io_status_; }
protected:
struct SubcompactionState;
// CompactionJob state
struct CompactionState;
void AggregateStatistics();
void UpdateCompactionStats();
void LogCompaction();
virtual void RecordCompactionIOStats();
@ -122,7 +199,7 @@ class CompactionJob {
void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
CompactionState* compact_;
InternalStats::CompactionStats compaction_stats_;
InternalStats::CompactionStatsFull compaction_stats_;
const ImmutableDBOptions& db_options_;
const MutableDBOptions mutable_db_options_copy_;
LogBuffer* log_buffer_;
@ -135,6 +212,8 @@ class CompactionJob {
IOStatus io_status_;
CompactionJobStats* compaction_job_stats_;
private:
friend class CompactionJobTestBase;
@ -150,15 +229,14 @@ class CompactionJob {
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers();
Status FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const Slice* next_table_min_key = nullptr);
Status FinishCompactionOutputFile(const Status& input_status,
SubcompactionState* sub_compact,
CompactionOutputs& outputs,
const Slice& next_table_min_key);
Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
CompactionOutputs& outputs);
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
@ -167,20 +245,12 @@ class CompactionJob {
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);
#ifndef ROCKSDB_LITE
void BuildSubcompactionJobInfo(
SubcompactionState* sub_compact,
SubcompactionJobInfo* subcompaction_job_info) const;
#endif // ROCKSDB_LITE
void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);
void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
uint32_t job_id_;
CompactionJobStats* compaction_job_stats_;
// DBImpl state
const std::string& dbname_;
const std::string db_id_;
@ -222,14 +292,12 @@ class CompactionJob {
bool measure_io_stats_;
// Stores the Slices that designate the boundaries for each subcompaction
std::vector<Slice> boundaries_;
// Stores the approx size of keys covered in the range of each subcompaction
std::vector<uint64_t> sizes_;
Env::Priority thread_pri_;
std::string full_history_ts_low_;
std::string trim_ts_;
BlobFileCompletionCallback* blob_callback_;
uint64_t GetCompactionId(SubcompactionState* sub_compact);
uint64_t GetCompactionId(SubcompactionState* sub_compact) const;
// Get table file name in where it's outputting to, which should also be in
// `output_directory_`.
@ -265,7 +333,6 @@ struct CompactionServiceInput {
std::string begin;
bool has_end = false;
std::string end;
uint64_t approx_size = 0;
// serialization interface to read and write the object
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
@ -357,7 +424,7 @@ class CompactionServiceCompactionJob : private CompactionJob {
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
const std::string& output_path,
std::string output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result);

@ -482,6 +482,17 @@ class CompactionJobTestBase : public testing::Test {
cfd_ = versions_->GetColumnFamilySet()->GetDefault();
}
void RunLastLevelCompaction(
const std::vector<std::vector<FileMetaData*>>& input_files,
std::function<void(Compaction& comp)>&& verify_func,
const std::vector<SequenceNumber>& snapshots = {}) {
const int kLastLevel = cf_options_.num_levels - 1;
verify_per_key_placement_ = std::move(verify_func);
mock::KVVector empty_map;
RunCompaction(input_files, empty_map, snapshots, kMaxSequenceNumber,
kLastLevel, false);
}
void RunCompaction(
const std::vector<std::vector<FileMetaData*>>& input_files,
const mock::KVVector& expected_results,
@ -571,6 +582,12 @@ class CompactionJobTestBase : public testing::Test {
if (check_get_priority) {
CheckGetRateLimiterPriority(compaction_job);
}
if (verify_per_key_placement_) {
// Verify per_key_placement compaction
assert(compaction.SupportsPerKeyPlacement());
verify_per_key_placement_(compaction);
}
}
void CheckGetRateLimiterPriority(CompactionJob& compaction_job) {
@ -620,6 +637,7 @@ class CompactionJobTestBase : public testing::Test {
std::string full_history_ts_low_;
const std::function<std::string(uint64_t)> encode_u64_ts_;
bool test_io_priority_;
std::function<void(Compaction& comp)> verify_per_key_placement_;
};
// TODO(icanadi) Make it simpler once we mock out VersionSet
@ -1311,6 +1329,75 @@ TEST_F(CompactionJobTest, OldestBlobFileNumber) {
/* expected_oldest_blob_file_number */ 19);
}
TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) {
cf_options_.bottommost_temperature = Temperature::kCold;
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
auto supports_per_key_placement = static_cast<bool*>(arg);
*supports_per_key_placement = true;
});
std::atomic_uint64_t latest_cold_seq = 0;
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput.context", [&](void* arg) {
auto context = static_cast<PerKeyPlacementContext*>(arg);
context->output_to_penultimate_level =
context->seq_num > latest_cold_seq;
});
SyncPoint::GetInstance()->EnableProcessing();
NewDB();
// Add files on different levels that may overlap
auto file0_1 = mock::MakeMockFile({{KeyStr("z", 12U, kTypeValue), "val"}});
AddMockFile(file0_1);
auto file1_1 = mock::MakeMockFile({{KeyStr("b", 10U, kTypeValue), "val"},
{KeyStr("f", 11U, kTypeValue), "val"}});
AddMockFile(file1_1, 1);
auto file1_2 = mock::MakeMockFile({{KeyStr("j", 12U, kTypeValue), "val"},
{KeyStr("k", 13U, kTypeValue), "val"}});
AddMockFile(file1_2, 1);
auto file1_3 = mock::MakeMockFile({{KeyStr("p", 14U, kTypeValue), "val"},
{KeyStr("u", 15U, kTypeValue), "val"}});
AddMockFile(file1_3, 1);
auto file2_1 = mock::MakeMockFile({{KeyStr("f", 8U, kTypeValue), "val"},
{KeyStr("h", 9U, kTypeValue), "val"}});
AddMockFile(file2_1, 2);
auto file2_2 = mock::MakeMockFile({{KeyStr("m", 6U, kTypeValue), "val"},
{KeyStr("p", 7U, kTypeValue), "val"}});
AddMockFile(file2_2, 2);
auto file3_1 = mock::MakeMockFile({{KeyStr("g", 2U, kTypeValue), "val"},
{KeyStr("k", 3U, kTypeValue), "val"}});
AddMockFile(file3_1, 3);
auto file3_2 = mock::MakeMockFile({{KeyStr("v", 4U, kTypeValue), "val"},
{KeyStr("x", 5U, kTypeValue), "val"}});
AddMockFile(file3_2, 3);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto files0 = cfd->current()->storage_info()->LevelFiles(0);
auto files1 = cfd->current()->storage_info()->LevelFiles(1);
auto files2 = cfd->current()->storage_info()->LevelFiles(2);
auto files3 = cfd->current()->storage_info()->LevelFiles(3);
RunLastLevelCompaction(
{files0, files1, files2, files3}, /*verify_func=*/[&](Compaction& comp) {
for (char c = 'a'; c <= 'z'; c++) {
std::string c_str;
c_str = c;
const Slice key(c_str);
if (c == 'a') {
ASSERT_FALSE(comp.WithinPenultimateLevelOutputRange(key));
} else {
ASSERT_TRUE(comp.WithinPenultimateLevelOutputRange(key));
}
}
});
}
TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) {
db_options_.enforce_single_del_contracts = false;
NewDB();
@ -1360,7 +1447,6 @@ TEST_F(CompactionJobTest, InputSerialization) {
if (input.has_end) {
input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
}
input.approx_size = rnd64.Uniform(UINT64_MAX);
std::string output;
ASSERT_OK(input.Write(&output));

@ -0,0 +1,314 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction_outputs.h"
#include "db/builder.h"
namespace ROCKSDB_NAMESPACE {
void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) {
builder_.reset(NewTableBuilder(tboptions, file_writer_.get()));
}
Status CompactionOutputs::Finish(const Status& intput_status) {
FileMetaData* meta = GetMetaData();
assert(meta != nullptr);
Status s = intput_status;
if (s.ok()) {
s = builder_->Finish();
} else {
builder_->Abandon();
}
Status io_s = builder_->io_status();
if (s.ok()) {
s = io_s;
} else {
io_s.PermitUncheckedError();
}
const uint64_t current_bytes = builder_->FileSize();
if (s.ok()) {
meta->fd.file_size = current_bytes;
meta->marked_for_compaction = builder_->NeedCompact();
}
current_output().finished = true;
stats_.bytes_written += current_bytes;
stats_.num_output_files = outputs_.size();
return s;
}
IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status,
SystemClock* clock,
Statistics* statistics,
bool use_fsync) {
IOStatus io_s;
if (input_status.ok()) {
StopWatch sw(clock, statistics, COMPACTION_OUTFILE_SYNC_MICROS);
io_s = file_writer_->Sync(use_fsync);
}
if (input_status.ok() && io_s.ok()) {
io_s = file_writer_->Close();
}
if (input_status.ok() && io_s.ok()) {
FileMetaData* meta = GetMetaData();
meta->file_checksum = file_writer_->GetFileChecksum();
meta->file_checksum_func_name = file_writer_->GetFileChecksumFuncName();
}
file_writer_.reset();
return io_s;
}
Status CompactionOutputs::AddToOutput(
const CompactionIterator& c_iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
Status s;
const Slice& key = c_iter.key();
if (!pending_close_ && c_iter.Valid() && partitioner_ && HasBuilder() &&
partitioner_->ShouldPartition(
PartitionerRequest(last_key_for_partitioner_, c_iter.user_key(),
current_output_file_size_)) == kRequired) {
pending_close_ = true;
}
if (pending_close_) {
s = close_file_func(*this, c_iter.InputStatus(), key);
pending_close_ = false;
}
if (!s.ok()) {
return s;
}
// Open output file if necessary
if (!HasBuilder()) {
s = open_file_func(*this);
}
if (!s.ok()) {
return s;
}
Output& curr = current_output();
assert(builder_ != nullptr);
const Slice& value = c_iter.value();
s = curr.validator.Add(key, value);
if (!s.ok()) {
return s;
}
builder_->Add(key, value);
stats_.num_output_records++;
current_output_file_size_ = builder_->EstimatedFileSize();
if (blob_garbage_meter_) {
s = blob_garbage_meter_->ProcessOutFlow(key, value);
}
if (!s.ok()) {
return s;
}
const ParsedInternalKey& ikey = c_iter.ikey();
s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
ikey.type);
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
//
// TODO(aekmekji): determine if file should be closed earlier than this
// during subcompactions (i.e. if output size, estimated by input size, is
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
if (compaction_->output_level() != 0 &&
current_output_file_size_ >= compaction_->max_output_file_size()) {
pending_close_ = true;
}
if (partitioner_) {
last_key_for_partitioner_.assign(c_iter.user_key().data_,
c_iter.user_key().size_);
}
return s;
}
Status CompactionOutputs::AddRangeDels(
const Slice* comp_start, const Slice* comp_end,
CompactionIterationStats& range_del_out_stats, bool bottommost_level,
const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
const Slice& next_table_min_key) {
assert(HasRangeDel());
FileMetaData& meta = current_output().meta;
const Comparator* ucmp = icmp.user_comparator();
Slice lower_bound_guard, upper_bound_guard;
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
bool lower_bound_from_sub_compact = false;
size_t output_size = outputs_.size();
if (output_size == 1) {
// For the first output table, include range tombstones before the min
// key but after the subcompaction boundary.
lower_bound = comp_start;
lower_bound_from_sub_compact = true;
} else if (meta.smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
// tombstones falling before min key.
smallest_user_key = meta.smallest.user_key().ToString(false /*hex*/);
lower_bound_guard = Slice(smallest_user_key);
lower_bound = &lower_bound_guard;
} else {
lower_bound = nullptr;
}
if (!next_table_min_key.empty()) {
// This may be the last file in the subcompaction in some cases, so we
// need to compare the end key of subcompaction with the next file start
// key. When the end key is chosen by the subcompaction, we know that
// it must be the biggest key in output file. Therefore, it is safe to
// use the smaller key as the upper bound of the output file, to ensure
// that there is no overlapping between different output files.
upper_bound_guard = ExtractUserKey(next_table_min_key);
if (comp_end != nullptr &&
ucmp->Compare(upper_bound_guard, *comp_end) >= 0) {
upper_bound = comp_end;
} else {
upper_bound = &upper_bound_guard;
}
} else {
// This is the last file in the subcompaction, so extend until the
// subcompaction ends.
upper_bound = comp_end;
}
bool has_overlapping_endpoints;
if (upper_bound != nullptr && meta.largest.size() > 0) {
has_overlapping_endpoints =
ucmp->Compare(meta.largest.user_key(), *upper_bound) == 0;
} else {
has_overlapping_endpoints = false;
}
// The end key of the subcompaction must be bigger or equal to the upper
// bound. If the end of subcompaction is null or the upper bound is null,
// it means that this file is the last file in the compaction. So there
// will be no overlapping between this file and others.
assert(comp_end == nullptr || upper_bound == nullptr ||
ucmp->Compare(*upper_bound, *comp_end) <= 0);
auto it = range_del_agg_->NewIterator(lower_bound, upper_bound,
has_overlapping_endpoints);
// Position the range tombstone output iterator. There may be tombstone
// fragments that are entirely out of range, so make sure that we do not
// include those.
if (lower_bound != nullptr) {
it->Seek(*lower_bound);
} else {
it->SeekToFirst();
}
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr) {
int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
if ((has_overlapping_endpoints && cmp < 0) ||
(!has_overlapping_endpoints && cmp <= 0)) {
// Tombstones starting after upper_bound only need to be included in
// the next table. If the current SST ends before upper_bound, i.e.,
// `has_overlapping_endpoints == false`, we can also skip over range
// tombstones that start exactly at upper_bound. Such range
// tombstones will be included in the next file and are not relevant
// to the point keys or endpoints of the current file.
break;
}
}
if (bottommost_level && tombstone.seq_ <= earliest_snapshot) {
// TODO(andrewkr): tombstones that span multiple output files are
// counted for each compaction output file, so lots of double
// counting.
range_del_out_stats.num_range_del_drop_obsolete++;
range_del_out_stats.num_record_drop_obsolete++;
continue;
}
auto kv = tombstone.Serialize();
assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0);
// Range tombstone is not supported by output validator yet.
builder_->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
// Pretend the smallest key has the same user key as lower_bound
// (the max key in the previous table or subcompaction) in order for
// files to appear key-space partitioned.
//
// When lower_bound is chosen by a subcompaction, we know that
// subcompactions over smaller keys cannot contain any keys at
// lower_bound. We also know that smaller subcompactions exist,
// because otherwise the subcompaction woud be unbounded on the left.
// As a result, we know that no other files on the output level will
// contain actual keys at lower_bound (an output file may have a
// largest key of lower_bound@kMaxSequenceNumber, but this only
// indicates a large range tombstone was truncated). Therefore, it is
// safe to use the tombstone's sequence number, to ensure that keys at
// lower_bound at lower levels are covered by truncated tombstones.
//
// If lower_bound was chosen by the smallest data key in the file,
// choose lowest seqnum so this file's smallest internal key comes
// after the previous file's largest. The fake seqnum is OK because
// the read path's file-picking code only considers user key.
smallest_candidate = InternalKey(
*lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
kTypeRangeDeletion);
}
InternalKey largest_candidate = tombstone.SerializeEndKey();
if (upper_bound != nullptr &&
ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
// Pretend the largest key has the same user key as upper_bound (the
// min key in the following table or subcompaction) in order for files
// to appear key-space partitioned.
//
// Choose highest seqnum so this file's largest internal key comes
// before the next file's/subcompaction's smallest. The fake seqnum is
// OK because the read path's file-picking code only considers the
// user key portion.
//
// Note Seek() also creates InternalKey with (user_key,
// kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
// kTypeRangeDeletion (0xF), so the range tombstone comes before the
// Seek() key in InternalKey's ordering. So Seek() will look in the
// next file for the user key.
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
}
#ifndef NDEBUG
SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
if (meta.smallest.size() > 0) {
smallest_ikey_seqnum = GetInternalKeySeqno(meta.smallest.Encode());
}
#endif
meta.UpdateBoundariesForRange(smallest_candidate, largest_candidate,
tombstone.seq_, icmp);
// The smallest key in a file is used for range tombstone truncation, so
// it cannot have a seqnum of 0 (unless the smallest data key in a file
// has a seqnum of 0). Otherwise, the truncated tombstone may expose
// deleted keys at lower levels.
assert(smallest_ikey_seqnum == 0 ||
ExtractInternalKeyFooter(meta.smallest.Encode()) !=
PackSequenceAndType(0, kTypeRangeDeletion));
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,328 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/blob/blob_garbage_meter.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iterator.h"
#include "db/internal_stats.h"
#include "db/output_validator.h"
namespace ROCKSDB_NAMESPACE {
class CompactionOutputs;
using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
using CompactionFileCloseFunc =
std::function<Status(CompactionOutputs&, const Status&, const Slice&)>;
// Files produced by subcompaction, most of the functions are used by
// compaction_job Open/Close compaction file functions.
class CompactionOutputs {
public:
// compaction output file
struct Output {
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
bool _enable_order_check, bool _enable_hash, bool _finished,
uint64_t precalculated_hash)
: meta(std::move(_meta)),
validator(_icmp, _enable_order_check, _enable_hash,
precalculated_hash),
finished(_finished) {}
FileMetaData meta;
OutputValidator validator;
bool finished;
std::shared_ptr<const TableProperties> table_properties;
};
CompactionOutputs() = delete;
explicit CompactionOutputs(const Compaction* compaction,
const bool is_penultimate_level)
: compaction_(compaction), is_penultimate_level_(is_penultimate_level) {
partitioner_ = compaction->output_level() == 0
? nullptr
: compaction->CreateSstPartitioner();
}
// Add generated output to the list
void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
bool enable_order_check, bool enable_hash,
bool finished = false, uint64_t precalculated_hash = 0) {
outputs_.emplace_back(std::move(meta), icmp, enable_order_check,
enable_hash, finished, precalculated_hash);
}
// Set new table builder for the current output
void NewBuilder(const TableBuilderOptions& tboptions);
// Assign a new WritableFileWriter to the current output
void AssignFileWriter(WritableFileWriter* writer) {
file_writer_.reset(writer);
}
// TODO: Remove it when remote compaction support tiered compaction
void SetTotalBytes(uint64_t bytes) { stats_.bytes_written += bytes; }
void SetNumOutputRecords(uint64_t num) { stats_.num_output_records = num; }
// TODO: Move the BlobDB builder into CompactionOutputs
const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
if (is_penultimate_level_) {
assert(blob_file_additions_.empty());
}
return blob_file_additions_;
}
std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
assert(!is_penultimate_level_);
return &blob_file_additions_;
}
bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
BlobGarbageMeter* CreateBlobGarbageMeter() {
assert(!is_penultimate_level_);
blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
return blob_garbage_meter_.get();
}
BlobGarbageMeter* GetBlobGarbageMeter() const {
if (is_penultimate_level_) {
// blobdb doesn't support per_key_placement yet
assert(blob_garbage_meter_ == nullptr);
return nullptr;
}
return blob_garbage_meter_.get();
}
void UpdateBlobStats() {
assert(!is_penultimate_level_);
stats_.num_output_files_blob = blob_file_additions_.size();
for (const auto& blob : blob_file_additions_) {
stats_.bytes_written_blob += blob.GetTotalBlobBytes();
}
}
// Finish the current output file
Status Finish(const Status& intput_status);
// Update output table properties from table builder
void UpdateTableProperties() {
current_output().table_properties =
std::make_shared<TableProperties>(GetTableProperties());
}
IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
Statistics* statistics, bool use_fsync);
TableProperties GetTableProperties() {
return builder_->GetTableProperties();
}
Slice SmallestUserKey() const {
if (!outputs_.empty() && outputs_[0].finished) {
return outputs_[0].meta.smallest.user_key();
} else {
return Slice{nullptr, 0};
}
}
Slice LargestUserKey() const {
if (!outputs_.empty() && outputs_.back().finished) {
return outputs_.back().meta.largest.user_key();
} else {
return Slice{nullptr, 0};
}
}
// In case the last output file is empty, which doesn't need to keep.
void RemoveLastEmptyOutput() {
if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
// An error occurred, so ignore the last output.
outputs_.pop_back();
}
}
// Remove the last output, for example the last output doesn't have data (no
// entry and no range-dels), but file_size might not be 0, as it has SST
// metadata.
void RemoveLastOutput() {
assert(!outputs_.empty());
outputs_.pop_back();
}
bool HasBuilder() const { return builder_ != nullptr; }
FileMetaData* GetMetaData() { return &current_output().meta; }
bool HasOutput() const { return !outputs_.empty(); }
uint64_t NumEntries() const { return builder_->NumEntries(); }
void ResetBuilder() {
builder_.reset();
current_output_file_size_ = 0;
}
// Add range-dels from the aggregator to the current output file
Status AddRangeDels(const Slice* comp_start, const Slice* comp_end,
CompactionIterationStats& range_del_out_stats,
bool bottommost_level, const InternalKeyComparator& icmp,
SequenceNumber earliest_snapshot,
const Slice& next_table_min_key);
// Is the current file is already pending for close
bool IsPendingClose() const { return pending_close_; }
// Current file should close before adding a new key
void SetPendingClose() { pending_close_ = true; }
// if the outputs have range delete, range delete is also data
bool HasRangeDel() const {
return range_del_agg_ && !range_del_agg_->IsEmpty();
}
private:
friend class SubcompactionState;
void Cleanup() {
if (builder_ != nullptr) {
// May happen if we get a shutdown call in the middle of compaction
builder_->Abandon();
builder_.reset();
}
}
uint64_t GetCurrentOutputFileSize() const {
return current_output_file_size_;
}
// Add curent key from compaction_iterator to the output file. If needed
// close and open new compaction output with the functions provided.
Status AddToOutput(const CompactionIterator& c_iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func);
// Close the current output. `open_file_func` is needed for creating new file
// for range-dels only output file.
Status CloseOutput(const Status& curr_status,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
Status status = curr_status;
// handle subcompaction containing only range deletions
if (status.ok() && !HasBuilder() && !HasOutput() && HasRangeDel()) {
status = open_file_func(*this);
}
if (HasBuilder()) {
const Slice empty_key{};
Status s = close_file_func(*this, status, empty_key);
if (!s.ok() && status.ok()) {
status = s;
}
}
return status;
}
// This subcompaction's output could be empty if compaction was aborted before
// this subcompaction had a chance to generate any output files. When
// subcompactions are executed sequentially this is more likely and will be
// particularly likely for the later subcompactions to be empty. Once they are
// run in parallel however it should be much rarer.
// It's caller's responsibility to make sure it's not empty.
Output& current_output() {
assert(!outputs_.empty());
return outputs_.back();
}
// Assign the range_del_agg to the target output level. There's only one
// range-del-aggregator per compaction outputs, for
// output_to_penultimate_level compaction it is only assigned to the
// penultimate level.
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
assert(range_del_agg_ == nullptr);
range_del_agg_ = std::move(range_del_agg);
}
const Compaction* compaction_;
// The current file is pending close, which needs to run `close_file_func()`
// first to add a new key.
bool pending_close_ = false;
// current output builder and writer
std::unique_ptr<TableBuilder> builder_;
std::unique_ptr<WritableFileWriter> file_writer_;
uint64_t current_output_file_size_ = 0;
// all the compaction outputs so far
std::vector<Output> outputs_;
// BlobDB info
std::vector<BlobFileAddition> blob_file_additions_;
std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
// Basic compaction output stats for this level's outputs
InternalStats::CompactionOutputsStats stats_;
// indicate if this CompactionOutputs obj for penultimate_level, should always
// be false if per_key_placement feature is not enabled.
const bool is_penultimate_level_;
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_ = nullptr;
// partitioner information
std::string last_key_for_partitioner_;
std::unique_ptr<SstPartitioner> partitioner_;
};
// helper struct to concatenate the last level and penultimate level outputs
// which could be replaced by std::ranges::join_view() in c++20
struct OutputIterator {
public:
explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
const std::vector<CompactionOutputs::Output>& b)
: a_(a), b_(b) {
within_a = !a_.empty();
idx_ = 0;
}
OutputIterator begin() { return *this; }
OutputIterator end() { return *this; }
size_t size() { return a_.size() + b_.size(); }
const CompactionOutputs::Output& operator*() const {
return within_a ? a_[idx_] : b_[idx_];
}
OutputIterator& operator++() {
idx_++;
if (within_a && idx_ >= a_.size()) {
within_a = false;
idx_ = 0;
}
assert(within_a || idx_ <= b_.size());
return *this;
}
bool operator!=(const OutputIterator& /*rhs*/) const {
return within_a || idx_ < b_.size();
}
private:
const std::vector<CompactionOutputs::Output>& a_;
const std::vector<CompactionOutputs::Output>& b_;
bool within_a;
size_t idx_;
};
} // namespace ROCKSDB_NAMESPACE

@ -214,13 +214,13 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
}
void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
InternalKey* smallest,
InternalKey* largest) const {
InternalKey* smallest, InternalKey* largest,
int exclude_level) const {
InternalKey current_smallest;
InternalKey current_largest;
bool initialized = false;
for (const auto& in : inputs) {
if (in.empty()) {
if (in.empty() || in.level == exclude_level) {
continue;
}
GetRange(in, &current_smallest, &current_largest);
@ -293,6 +293,12 @@ bool CompactionPicker::RangeOverlapWithCompaction(
// Overlap
return true;
}
if (c->SupportsPerKeyPlacement()) {
if (c->OverlapPenultimateLevelOutputRange(smallest_user_key,
largest_user_key)) {
return true;
}
}
}
// Did not overlap with any running compaction in level `level`
return false;
@ -301,9 +307,11 @@ bool CompactionPicker::RangeOverlapWithCompaction(
bool CompactionPicker::FilesRangeOverlapWithCompaction(
const std::vector<CompactionInputFiles>& inputs, int level) const {
bool is_empty = true;
int start_level = -1;
for (auto& in : inputs) {
if (!in.empty()) {
is_empty = false;
start_level = in.level; // inputs are sorted by level
break;
}
}
@ -313,7 +321,19 @@ bool CompactionPicker::FilesRangeOverlapWithCompaction(
}
InternalKey smallest, largest;
GetRange(inputs, &smallest, &largest);
GetRange(inputs, &smallest, &largest, Compaction::kInvalidLevel);
int penultimate_level =
Compaction::EvaluatePenultimateLevel(ioptions_, start_level, level);
if (penultimate_level != Compaction::kInvalidLevel) {
InternalKey penultimate_smallest, penultimate_largest;
GetRange(inputs, &penultimate_smallest, &penultimate_largest, level);
if (RangeOverlapWithCompaction(penultimate_smallest.user_key(),
penultimate_largest.user_key(),
penultimate_level)) {
return true;
}
}
return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
level);
}

@ -154,7 +154,8 @@ class CompactionPicker {
// in *smallest, *largest.
// REQUIRES: inputs is not empty (at least on entry have one file)
void GetRange(const std::vector<CompactionInputFiles>& inputs,
InternalKey* smallest, InternalKey* largest) const;
InternalKey* smallest, InternalKey* largest,
int exclude_level) const;
int NumberLevels() const { return ioptions_.num_levels; }

@ -430,8 +430,7 @@ TEST_F(CompactionPickerTest, LevelTriggerDynamic4) {
#ifndef ROCKSDB_LITE
TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
NewVersionStorage(1, kCompactionStyleUniversal);
UniversalCompactionPicker universal_compaction_picker(
ioptions_, &icmp_);
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()),
@ -3048,6 +3047,192 @@ TEST_F(CompactionPickerTest, UniversalMarkedManualCompaction) {
ASSERT_EQ(0U, vstorage_->FilesMarkedForCompaction().size());
}
class PerKeyPlacementCompactionPickerTest
: public CompactionPickerTest,
public testing::WithParamInterface<bool> {
public:
PerKeyPlacementCompactionPickerTest() : CompactionPickerTest() {}
void SetUp() override { enable_per_key_placement_ = GetParam(); }
protected:
bool enable_per_key_placement_ = false;
};
TEST_P(PerKeyPlacementCompactionPickerTest, OverlapWithNormalCompaction) {
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
auto supports_per_key_placement = static_cast<bool*>(arg);
*supports_per_key_placement = enable_per_key_placement_;
});
SyncPoint::GetInstance()->EnableProcessing();
int num_levels = ioptions_.num_levels;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 21U, "100", "150", 60000000U);
Add(0, 22U, "300", "350", 60000000U);
Add(5, 40U, "200", "250", 60000000U);
Add(6, 50U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(40);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(level_compaction_picker.CompactFiles(
comp_options, input_files, 5, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
input_set.clear();
input_files.clear();
input_set.insert(21);
input_set.insert(22);
input_set.insert(50);
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(
enable_per_key_placement_,
level_compaction_picker.FilesRangeOverlapWithCompaction(input_files, 6));
}
TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlap) {
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
auto supports_per_key_placement = static_cast<bool*>(arg);
*supports_per_key_placement = enable_per_key_placement_;
});
SyncPoint::GetInstance()->EnableProcessing();
int num_levels = ioptions_.num_levels;
NewVersionStorage(num_levels, kCompactionStyleLevel);
Add(0, 21U, "100", "150", 60000000U);
Add(0, 22U, "300", "350", 60000000U);
Add(4, 40U, "200", "220", 60000000U);
Add(4, 41U, "230", "250", 60000000U);
Add(6, 50U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(21);
input_set.insert(22);
input_set.insert(50);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(level_compaction_picker.CompactFiles(
comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
input_set.clear();
input_files.clear();
input_set.insert(40);
input_set.insert(41);
ASSERT_OK(level_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(
enable_per_key_placement_,
level_compaction_picker.FilesRangeOverlapWithCompaction(input_files, 5));
}
TEST_P(PerKeyPlacementCompactionPickerTest,
OverlapWithNormalCompactionUniveral) {
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
auto supports_per_key_placement = static_cast<bool*>(arg);
*supports_per_key_placement = enable_per_key_placement_;
});
SyncPoint::GetInstance()->EnableProcessing();
int num_levels = ioptions_.num_levels;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(num_levels, kCompactionStyleUniversal);
Add(0, 21U, "100", "150", 60000000U);
Add(0, 22U, "300", "350", 60000000U);
Add(5, 40U, "200", "250", 60000000U);
Add(6, 50U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(40);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(universal_compaction_picker.CompactFiles(
comp_options, input_files, 5, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
input_set.clear();
input_files.clear();
input_set.insert(21);
input_set.insert(22);
input_set.insert(50);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 6));
}
TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlapUniversal) {
SyncPoint::GetInstance()->SetCallBack(
"Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
auto supports_per_key_placement = static_cast<bool*>(arg);
*supports_per_key_placement = enable_per_key_placement_;
});
SyncPoint::GetInstance()->EnableProcessing();
int num_levels = ioptions_.num_levels;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(num_levels, kCompactionStyleUniversal);
Add(0, 21U, "100", "150", 60000000U);
Add(0, 22U, "300", "350", 60000000U);
Add(4, 40U, "200", "220", 60000000U);
Add(4, 41U, "230", "250", 60000000U);
Add(6, 50U, "101", "351", 60000000U);
UpdateVersionStorageInfo();
CompactionOptions comp_options;
std::unordered_set<uint64_t> input_set;
input_set.insert(21);
input_set.insert(22);
input_set.insert(50);
std::vector<CompactionInputFiles> input_files;
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
std::unique_ptr<Compaction> comp1(universal_compaction_picker.CompactFiles(
comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_,
mutable_db_options_, 0));
input_set.clear();
input_files.clear();
input_set.insert(40);
input_set.insert(41);
ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers(
&input_files, &input_set, vstorage_.get(), comp_options));
ASSERT_EQ(enable_per_key_placement_,
universal_compaction_picker.FilesRangeOverlapWithCompaction(
input_files, 5));
}
INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompactionPickerTest,
PerKeyPlacementCompactionPickerTest, ::testing::Bool());
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,825 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction_job.h"
#include "db/compaction/compaction_state.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/utilities/options_type.h"
#ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE {
class SubcompactionState;
CompactionServiceJobStatus
CompactionJob::ProcessKeyValueCompactionWithCompactionService(
SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
assert(db_options_.compaction_service);
const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;
compaction_input.output_level = compaction->output_level();
compaction_input.db_id = db_id_;
const std::vector<CompactionInputFiles>& inputs =
*(compact_->compaction->inputs());
for (const auto& files_per_level : inputs) {
for (const auto& file : files_per_level.files) {
compaction_input.input_files.emplace_back(
MakeTableFileName(file->fd.GetNumber()));
}
}
compaction_input.column_family.name =
compaction->column_family_data()->GetName();
compaction_input.column_family.options =
compaction->column_family_data()->GetLatestCFOptions();
compaction_input.db_options =
BuildDBOptions(db_options_, mutable_db_options_copy_);
compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start;
compaction_input.begin =
compaction_input.has_begin ? sub_compact->start->ToString() : "";
compaction_input.has_end = sub_compact->end;
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
std::ostringstream input_files_oss;
bool is_first_one = true;
for (const auto& file : compaction_input.input_files) {
input_files_oss << (is_first_one ? "" : ", ") << file;
is_first_one = false;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->StartV2(info, compaction_input_binary);
switch (compaction_status) {
case CompactionServiceJobStatus::kSuccess:
break;
case CompactionServiceJobStatus::kFailure:
sub_compact->status = Status::Incomplete(
"CompactionService failed to start compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API Start.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
default:
assert(false); // unknown status
break;
}
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
std::string compaction_result_binary;
compaction_status = db_options_.compaction_service->WaitForCompleteV2(
info, &compaction_result_binary);
if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API "
"WaitForComplete.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}
CompactionServiceResult compaction_result;
s = CompactionServiceResult::Read(compaction_result_binary,
&compaction_result);
if (compaction_status == CompactionServiceJobStatus::kFailure) {
if (s.ok()) {
if (compaction_result.status.ok()) {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (even though "
"the internal status is okay).");
} else {
// set the current sub compaction status with the status returned from
// remote
sub_compact->status = compaction_result.status;
}
} else {
sub_compact->status = Status::Incomplete(
"CompactionService failed to run the compaction job (and no valid "
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed.",
compaction_input.column_family.name.c_str(), job_id_);
return compaction_status;
}
if (!s.ok()) {
sub_compact->status = s;
compaction_result.status.PermitUncheckedError();
return CompactionServiceJobStatus::kFailure;
}
sub_compact->status = compaction_result.status;
std::ostringstream output_files_oss;
is_first_one = true;
for (const auto& file : compaction_result.output_files) {
output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
is_first_one = false;
}
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Receive remote compaction result, output path: "
"%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction_result.output_path.c_str(),
output_files_oss.str().c_str());
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
for (const auto& file : compaction_result.output_files) {
uint64_t file_num = versions_->NewFileNumber();
auto src_file = compaction_result.output_path + "/" + file.file_name;
auto tgt_file = TableFileName(compaction->immutable_options()->cf_paths,
file_num, compaction->output_path_id());
s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
FileMetaData meta;
uint64_t file_size;
s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
if (!s.ok()) {
sub_compact->status = s;
return CompactionServiceJobStatus::kFailure;
}
meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
file.smallest_seqno, file.largest_seqno);
meta.smallest.DecodeFrom(file.smallest_internal_key);
meta.largest.DecodeFrom(file.largest_internal_key);
meta.oldest_ancester_time = file.oldest_ancester_time;
meta.file_creation_time = file.file_creation_time;
meta.marked_for_compaction = file.marked_for_compaction;
meta.unique_id = file.unique_id;
auto cfd = compaction->column_family_data();
sub_compact->Current().AddOutput(std::move(meta),
cfd->internal_comparator(), false, false,
true, file.paranoid_hash);
}
sub_compact->compaction_job_stats = compaction_result.stats;
sub_compact->Current().SetNumOutputRecords(
compaction_result.num_output_records);
sub_compact->Current().SetTotalBytes(compaction_result.total_bytes);
RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
compaction_result.bytes_written);
return CompactionServiceJobStatus::kSuccess;
}
std::string CompactionServiceCompactionJob::GetTableFileName(
uint64_t file_number) {
return MakeTableFileName(output_path_, file_number);
}
void CompactionServiceCompactionJob::RecordCompactionIOStats() {
compaction_result_->bytes_read += IOSTATS(bytes_read);
compaction_result_->bytes_written += IOSTATS(bytes_written);
CompactionJob::RecordCompactionIOStats();
}
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
VersionSet* versions, const std::atomic<bool>* shutting_down,
LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<bool>& manual_compaction_canceled,
const std::string& db_id, const std::string& db_session_id,
std::string output_path,
const CompactionServiceInput& compaction_service_input,
CompactionServiceResult* compaction_service_result)
: CompactionJob(
job_id, compaction, db_options, mutable_db_options, file_options,
versions, shutting_down, log_buffer, nullptr, output_directory,
nullptr, stats, db_mutex, db_error_handler,
std::move(existing_snapshots), kMaxSequenceNumber, nullptr, nullptr,
std::move(table_cache), event_logger,
compaction->mutable_cf_options()->paranoid_file_checks,
compaction->mutable_cf_options()->report_bg_io_stats, dbname,
&(compaction_service_result->stats), Env::Priority::USER, io_tracer,
manual_compaction_canceled, db_id, db_session_id,
compaction->column_family_data()->GetFullHistoryTsLow()),
output_path_(std::move(output_path)),
compaction_input_(compaction_service_input),
compaction_result_(compaction_service_result) {}
Status CompactionServiceCompactionJob::Run() {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_RUN);
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
write_hint_ =
c->column_family_data()->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
Slice begin = compaction_input_.begin;
Slice end = compaction_input_.end;
compact_->sub_compact_states.emplace_back(
c, compaction_input_.has_begin ? &begin : nullptr,
compaction_input_.has_end ? &end : nullptr, /*sub_job_id*/ 0);
log_buffer_->FlushBufferToLog();
LogCompaction();
const uint64_t start_micros = db_options_.clock->NowMicros();
// Pick the only sub-compaction we should have
assert(compact_->sub_compact_states.size() == 1);
SubcompactionState* sub_compact = compact_->sub_compact_states.data();
ProcessKeyValueCompaction(sub_compact);
compaction_stats_.stats.micros =
db_options_.clock->NowMicros() - start_micros;
compaction_stats_.stats.cpu_micros =
sub_compact->compaction_job_stats.cpu_micros;
RecordTimeToHistogram(stats_, COMPACTION_TIME,
compaction_stats_.stats.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
compaction_stats_.stats.cpu_micros);
Status status = sub_compact->status;
IOStatus io_s = sub_compact->io_status;
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
DirFsyncOptions());
}
}
if (io_status_.ok()) {
io_status_ = io_s;
}
if (status.ok()) {
status = io_s;
}
if (status.ok()) {
// TODO: Add verify_table()
}
// Finish up all book-keeping to unify the subcompaction results
compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
UpdateCompactionStats();
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
compact_->status = status;
compact_->status.PermitUncheckedError();
// Build compaction result
compaction_result_->output_level = compact_->compaction->output_level();
compaction_result_->output_path = output_path_;
for (const auto& output_file : sub_compact->GetOutputs()) {
auto& meta = output_file.meta;
compaction_result_->output_files.emplace_back(
MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
meta.largest.Encode().ToString(), meta.oldest_ancester_time,
meta.file_creation_time, output_file.validator.GetHash(),
meta.marked_for_compaction, meta.unique_id);
}
InternalStats::CompactionStatsFull compaction_stats;
sub_compact->AggregateCompactionStats(compaction_stats);
compaction_result_->num_output_records =
compaction_stats.stats.num_output_records;
compaction_result_->total_bytes = compaction_stats.TotalBytesWritten();
return status;
}
void CompactionServiceCompactionJob::CleanupCompaction() {
CompactionJob::CleanupCompaction();
}
// Internal binary format for the input and result data
enum BinaryFormatVersion : uint32_t {
kOptionsString = 1, // Use string format similar to Option string format
};
static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
{"name",
{offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options",
{offsetof(struct ColumnFamilyDescriptor, options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
value, cf_options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
std::string result;
auto status =
GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
auto this_conf = CFOptionsAsConfigurable(*this_one);
auto that_conf = CFOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"column_family",
OptionTypeInfo::Struct(
"column_family", &cfd_type_info,
offsetof(struct CompactionServiceInput, column_family),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"db_options",
{offsetof(struct CompactionServiceInput, db_options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto options = static_cast<DBOptions*>(addr);
return GetDBOptionsFromString(opts, DBOptions(), value, options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto options = static_cast<const DBOptions*>(addr);
std::string result;
auto status = GetStringFromDBOptions(opts, *options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const DBOptions*>(addr1);
const auto that_one = static_cast<const DBOptions*>(addr2);
auto this_conf = DBOptionsAsConfigurable(*this_one);
auto that_conf = DBOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
{"input_files", OptionTypeInfo::Vector<std::string>(
offsetof(struct CompactionServiceInput, input_files),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kEncodedString})},
{"output_level",
{offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"db_id",
{offsetof(struct CompactionServiceInput, db_id),
OptionType::kEncodedString}},
{"has_begin",
{offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"begin",
{offsetof(struct CompactionServiceInput, begin),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"has_end",
{offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"end",
{offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo>
cs_output_file_type_info = {
{"file_name",
{offsetof(struct CompactionServiceOutputFile, file_name),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_seqno",
{offsetof(struct CompactionServiceOutputFile, smallest_seqno),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_seqno",
{offsetof(struct CompactionServiceOutputFile, largest_seqno),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_internal_key",
{offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_internal_key",
{offsetof(struct CompactionServiceOutputFile, largest_internal_key),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"oldest_ancester_time",
{offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_creation_time",
{offsetof(struct CompactionServiceOutputFile, file_creation_time),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"paranoid_hash",
{offsetof(struct CompactionServiceOutputFile, paranoid_hash),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"marked_for_compaction",
{offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"unique_id",
OptionTypeInfo::Array<uint64_t, 2>(
offsetof(struct CompactionServiceOutputFile, unique_id),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
{0, OptionType::kUInt64T})},
};
static std::unordered_map<std::string, OptionTypeInfo>
compaction_job_stats_type_info = {
{"elapsed_micros",
{offsetof(struct CompactionJobStats, elapsed_micros),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"cpu_micros",
{offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"num_input_records",
{offsetof(struct CompactionJobStats, num_input_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_blobs_read",
{offsetof(struct CompactionJobStats, num_blobs_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files",
{offsetof(struct CompactionJobStats, num_input_files),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_files_at_output_level",
{offsetof(struct CompactionJobStats, num_input_files_at_output_level),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_records",
{offsetof(struct CompactionJobStats, num_output_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files",
{offsetof(struct CompactionJobStats, num_output_files),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_files_blob",
{offsetof(struct CompactionJobStats, num_output_files_blob),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_full_compaction",
{offsetof(struct CompactionJobStats, is_full_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"is_manual_compaction",
{offsetof(struct CompactionJobStats, is_manual_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_bytes",
{offsetof(struct CompactionJobStats, total_input_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_blob_bytes_read",
{offsetof(struct CompactionJobStats, total_blob_bytes_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_output_bytes",
{offsetof(struct CompactionJobStats, total_output_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_output_bytes_blob",
{offsetof(struct CompactionJobStats, total_output_bytes_blob),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_records_replaced",
{offsetof(struct CompactionJobStats, num_records_replaced),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_raw_key_bytes",
{offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_input_raw_value_bytes",
{offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_input_deletion_records",
{offsetof(struct CompactionJobStats, num_input_deletion_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_expired_deletion_records",
{offsetof(struct CompactionJobStats, num_expired_deletion_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_corrupt_keys",
{offsetof(struct CompactionJobStats, num_corrupt_keys),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_write_nanos",
{offsetof(struct CompactionJobStats, file_write_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_range_sync_nanos",
{offsetof(struct CompactionJobStats, file_range_sync_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_fsync_nanos",
{offsetof(struct CompactionJobStats, file_fsync_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"file_prepare_write_nanos",
{offsetof(struct CompactionJobStats, file_prepare_write_nanos),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"smallest_output_key_prefix",
{offsetof(struct CompactionJobStats, smallest_output_key_prefix),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"largest_output_key_prefix",
{offsetof(struct CompactionJobStats, largest_output_key_prefix),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_single_del_fallthru",
{offsetof(struct CompactionJobStats, num_single_del_fallthru),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_single_del_mismatch",
{offsetof(struct CompactionJobStats, num_single_del_mismatch),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
namespace {
// this is a helper struct to serialize and deserialize class Status, because
// Status's members are not public.
struct StatusSerializationAdapter {
uint8_t code;
uint8_t subcode;
uint8_t severity;
std::string message;
StatusSerializationAdapter() = default;
explicit StatusSerializationAdapter(const Status& s) {
code = s.code();
subcode = s.subcode();
severity = s.severity();
auto msg = s.getState();
message = msg ? msg : "";
}
Status GetStatus() const {
return Status{static_cast<Status::Code>(code),
static_cast<Status::SubCode>(subcode),
static_cast<Status::Severity>(severity), message};
}
};
} // namespace
static std::unordered_map<std::string, OptionTypeInfo>
status_adapter_type_info = {
{"code",
{offsetof(struct StatusSerializationAdapter, code),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"subcode",
{offsetof(struct StatusSerializationAdapter, subcode),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"severity",
{offsetof(struct StatusSerializationAdapter, severity),
OptionType::kUInt8T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"message",
{offsetof(struct StatusSerializationAdapter, message),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};
static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
{"status",
{offsetof(struct CompactionServiceResult, status),
OptionType::kCustomizable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto status_obj = static_cast<Status*>(addr);
StatusSerializationAdapter adapter;
Status s = OptionTypeInfo::ParseType(
opts, value, status_adapter_type_info, &adapter);
*status_obj = adapter.GetStatus();
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto status_obj = static_cast<const Status*>(addr);
StatusSerializationAdapter adapter(*status_obj);
std::string result;
Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
&adapter, &result);
*value = "{" + result + "}";
return s;
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr1, const void* addr2, std::string* mismatch) {
const auto status1 = static_cast<const Status*>(addr1);
const auto status2 = static_cast<const Status*>(addr2);
StatusSerializationAdapter adatper1(*status1);
StatusSerializationAdapter adapter2(*status2);
return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
&adatper1, &adapter2, mismatch);
}}},
{"output_files",
OptionTypeInfo::Vector<CompactionServiceOutputFile>(
offsetof(struct CompactionServiceResult, output_files),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
OptionVerificationType::kNormal,
OptionTypeFlags::kNone))},
{"output_level",
{offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"output_path",
{offsetof(struct CompactionServiceResult, output_path),
OptionType::kEncodedString, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"num_output_records",
{offsetof(struct CompactionServiceResult, num_output_records),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"total_bytes",
{offsetof(struct CompactionServiceResult, total_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_read",
{offsetof(struct CompactionServiceResult, bytes_read),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"bytes_written",
{offsetof(struct CompactionServiceResult, bytes_written),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"stats", OptionTypeInfo::Struct(
"stats", &compaction_job_stats_type_info,
offsetof(struct CompactionServiceResult, stats),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
};
Status CompactionServiceInput::Read(const std::string& data_str,
CompactionServiceInput* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceInput string");
}
auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
cf.ignore_unknown_options = true;
return OptionTypeInfo::ParseType(
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
obj);
} else {
return Status::NotSupported(
"Compaction Service Input data version not supported: " +
std::to_string(format_version));
}
}
Status CompactionServiceInput::Write(std::string* output) {
char buf[sizeof(BinaryFormatVersion)];
EncodeFixed32(buf, kOptionsString);
output->append(buf, sizeof(BinaryFormatVersion));
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
}
Status CompactionServiceResult::Read(const std::string& data_str,
CompactionServiceResult* obj) {
if (data_str.size() <= sizeof(BinaryFormatVersion)) {
return Status::InvalidArgument("Invalid CompactionServiceResult string");
}
auto format_version = DecodeFixed32(data_str.data());
if (format_version == kOptionsString) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
cf.ignore_unknown_options = true;
return OptionTypeInfo::ParseType(
cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
obj);
} else {
return Status::NotSupported(
"Compaction Service Result data version not supported: " +
std::to_string(format_version));
}
}
Status CompactionServiceResult::Write(std::string* output) {
char buf[sizeof(BinaryFormatVersion)];
EncodeFixed32(buf, kOptionsString);
output->append(buf, sizeof(BinaryFormatVersion));
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
}
#ifndef NDEBUG
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
std::string mismatch;
return TEST_Equals(other, &mismatch);
}
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
std::string* mismatch) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
mismatch);
}
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
std::string mismatch;
return TEST_Equals(other, &mismatch);
}
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
std::string* mismatch) {
ConfigOptions cf;
cf.invoke_prepare_options = false;
return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
mismatch);
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE

@ -0,0 +1,46 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/compaction_state.h"
namespace ROCKSDB_NAMESPACE {
Slice CompactionState::SmallestUserKey() {
for (const auto& sub_compact_state : sub_compact_states) {
Slice smallest = sub_compact_state.SmallestUserKey();
if (!smallest.empty()) {
return smallest;
}
}
// If there is no finished output, return an empty slice.
return Slice{nullptr, 0};
}
Slice CompactionState::LargestUserKey() {
for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
++it) {
Slice largest = it->LargestUserKey();
if (!largest.empty()) {
return largest;
}
}
// If there is no finished output, return an empty slice.
return Slice{nullptr, 0};
}
void CompactionState::AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats,
CompactionJobStats& compaction_job_stats) {
for (const auto& sc : sub_compact_states) {
sc.AggregateCompactionStats(compaction_stats);
compaction_job_stats.Add(sc.compaction_job_stats);
}
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,42 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/compaction/compaction.h"
#include "db/compaction/subcompaction_state.h"
#include "db/internal_stats.h"
// Data structures used for compaction_job and compaction_service_job which has
// the list of sub_compact_states and the aggregated information for the
// compaction.
namespace ROCKSDB_NAMESPACE {
// Maintains state for the entire compaction
class CompactionState {
public:
Compaction* const compaction;
// REQUIRED: subcompaction states are stored in order of increasing key-range
std::vector<SubcompactionState> sub_compact_states;
Status status;
void AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats,
CompactionJobStats& compaction_job_stats);
explicit CompactionState(Compaction* c) : compaction(c) {}
Slice SmallestUserKey();
Slice LargestUserKey();
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,223 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/subcompaction_state.h"
#include "rocksdb/sst_partitioner.h"
namespace ROCKSDB_NAMESPACE {
void SubcompactionState::AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats) const {
compaction_stats.stats.Add(compaction_outputs_.stats_);
if (HasPenultimateLevelOutputs()) {
compaction_stats.has_penultimate_level_output = true;
compaction_stats.penultimate_level_stats.Add(
penultimate_level_outputs_.stats_);
}
}
void SubcompactionState::FillFilesToCutForTtl() {
if (compaction->immutable_options()->compaction_style !=
CompactionStyle::kCompactionStyleLevel ||
compaction->immutable_options()->compaction_pri !=
CompactionPri::kMinOverlappingRatio ||
compaction->mutable_cf_options()->ttl == 0 ||
compaction->num_input_levels() < 2 || compaction->bottommost_level()) {
return;
}
// We define new file with the oldest ancestor time to be younger than 1/4
// TTL, and an old one to be older than 1/2 TTL time.
int64_t temp_current_time;
auto get_time_status = compaction->immutable_options()->clock->GetCurrentTime(
&temp_current_time);
if (!get_time_status.ok()) {
return;
}
auto current_time = static_cast<uint64_t>(temp_current_time);
if (current_time < compaction->mutable_cf_options()->ttl) {
return;
}
uint64_t old_age_thres =
current_time - compaction->mutable_cf_options()->ttl / 2;
const std::vector<FileMetaData*>& olevel =
*(compaction->inputs(compaction->num_input_levels() - 1));
for (FileMetaData* file : olevel) {
// Worth filtering out by start and end?
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
// We put old files if they are not too small to prevent a flood
// of small files.
if (oldest_ancester_time < old_age_thres &&
file->fd.GetFileSize() >
compaction->mutable_cf_options()->target_file_size_base / 2) {
files_to_cut_for_ttl_.push_back(file);
}
}
}
OutputIterator SubcompactionState::GetOutputs() const {
return OutputIterator(penultimate_level_outputs_.outputs_,
compaction_outputs_.outputs_);
}
void SubcompactionState::Cleanup(Cache* cache) {
penultimate_level_outputs_.Cleanup();
compaction_outputs_.Cleanup();
if (!status.ok()) {
for (const auto& out : GetOutputs()) {
// If this file was inserted into the table cache then remove
// them here because this compaction was not committed.
TableCache::Evict(cache, out.meta.fd.GetNumber());
}
}
// TODO: sub_compact.io_status is not checked like status. Not sure if thats
// intentional. So ignoring the io_status as of now.
io_status.PermitUncheckedError();
}
Slice SubcompactionState::SmallestUserKey() const {
if (has_penultimate_level_outputs_) {
Slice a = compaction_outputs_.SmallestUserKey();
Slice b = penultimate_level_outputs_.SmallestUserKey();
if (a.empty()) {
return b;
}
if (b.empty()) {
return a;
}
const Comparator* user_cmp =
compaction->column_family_data()->user_comparator();
if (user_cmp->Compare(a, b) > 0) {
return b;
} else {
return a;
}
} else {
return compaction_outputs_.SmallestUserKey();
}
}
Slice SubcompactionState::LargestUserKey() const {
if (has_penultimate_level_outputs_) {
Slice a = compaction_outputs_.LargestUserKey();
Slice b = penultimate_level_outputs_.LargestUserKey();
if (a.empty()) {
return b;
}
if (b.empty()) {
return a;
}
const Comparator* user_cmp =
compaction->column_family_data()->user_comparator();
if (user_cmp->Compare(a, b) < 0) {
return b;
} else {
return a;
}
} else {
return compaction_outputs_.LargestUserKey();
}
}
bool SubcompactionState::ShouldStopBefore(const Slice& internal_key) {
uint64_t curr_file_size = Current().GetCurrentOutputFileSize();
const InternalKeyComparator* icmp =
&compaction->column_family_data()->internal_comparator();
// Invalid local_output_split_key indicates that we do not need to split
if (local_output_split_key_ != nullptr && !is_split_) {
// Split occurs when the next key is larger than/equal to the cursor
if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) {
is_split_ = true;
return true;
}
}
const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
bool grandparant_file_switched = false;
// Scan to find the earliest grandparent file that contains key.
while (grandparent_index_ < grandparents.size() &&
icmp->Compare(internal_key,
grandparents[grandparent_index_]->largest.Encode()) >
0) {
if (seen_key_) {
overlapped_bytes_ += grandparents[grandparent_index_]->fd.GetFileSize();
grandparant_file_switched = true;
}
assert(grandparent_index_ + 1 >= grandparents.size() ||
icmp->Compare(
grandparents[grandparent_index_]->largest.Encode(),
grandparents[grandparent_index_ + 1]->smallest.Encode()) <= 0);
grandparent_index_++;
}
seen_key_ = true;
if (grandparant_file_switched &&
overlapped_bytes_ + curr_file_size > compaction->max_compaction_bytes()) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
}
if (!files_to_cut_for_ttl_.empty()) {
if (cur_files_to_cut_for_ttl_ != -1) {
// Previous key is inside the range of a file
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_]
->largest.Encode()) > 0) {
next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1;
cur_files_to_cut_for_ttl_ = -1;
return true;
}
} else {
// Look for the key position
while (next_files_to_cut_for_ttl_ <
static_cast<int>(files_to_cut_for_ttl_.size())) {
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
->smallest.Encode()) >= 0) {
if (icmp->Compare(internal_key,
files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
->largest.Encode()) <= 0) {
// With in the current file
cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_;
return true;
}
// Beyond the current file
next_files_to_cut_for_ttl_++;
} else {
// Still fall into the gap
break;
}
}
}
}
return false;
}
Status SubcompactionState::AddToOutput(
const CompactionIterator& iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
// update target output first
is_current_penultimate_level_ = iter.output_to_penultimate_level();
current_outputs_ = is_current_penultimate_level_ ? &penultimate_level_outputs_
: &compaction_outputs_;
if (is_current_penultimate_level_) {
has_penultimate_level_outputs_ = true;
}
return Current().AddToOutput(iter, open_file_func, close_file_func);
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,255 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_garbage_meter.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_outputs.h"
#include "db/internal_stats.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
namespace ROCKSDB_NAMESPACE {
// Maintains state and outputs for each sub-compaction
// It contains 2 `CompactionOutputs`:
// 1. one for the normal output files
// 2. another for the penultimate level outputs
// a `current` pointer maintains the current output group, when calling
// `AddToOutput()`, it checks the output of the current compaction_iterator key
// and point `current` to the target output group. By default, it just points to
// normal compaction_outputs, if the compaction_iterator key should be placed on
// the penultimate level, `current` is changed to point to
// `penultimate_level_outputs`.
// The later operations uses `Current()` to get the target group.
//
// +----------+ +-----------------------------+ +---------+
// | *current |--------> | compaction_outputs |----->| output |
// +----------+ +-----------------------------+ +---------+
// | | output |
// | +---------+
// | | ... |
// |
// | +-----------------------------+ +---------+
// +-------------> | penultimate_level_outputs |----->| output |
// +-----------------------------+ +---------+
// | ... |
class SubcompactionState {
public:
const Compaction* compaction;
// The boundaries of the key-range this compaction is interested in. No two
// sub-compactions may have overlapping key-ranges.
// 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
const Slice *start, *end;
// The return status of this sub-compaction
Status status;
// The return IO Status of this sub-compaction
IOStatus io_status;
// Notify on sub-compaction completion only if listener was notified on
// sub-compaction begin.
bool notify_on_subcompaction_completion = false;
// compaction job stats for this sub-compaction
CompactionJobStats compaction_job_stats;
// sub-compaction job id, which is used to identify different sub-compaction
// within the same compaction job.
const uint32_t sub_job_id;
Slice SmallestUserKey() const;
Slice LargestUserKey() const;
// Get all outputs from the subcompaction. For per_key_placement compaction,
// it returns both the last level outputs and penultimate level outputs.
OutputIterator GetOutputs() const;
// Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the
// penultimate level.
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) {
penultimate_level_outputs_.AssignRangeDelAggregator(
std::move(range_del_agg));
} else {
compaction_outputs_.AssignRangeDelAggregator(std::move(range_del_agg));
}
}
void RemoveLastEmptyOutput() {
compaction_outputs_.RemoveLastEmptyOutput();
penultimate_level_outputs_.RemoveLastEmptyOutput();
}
#ifndef ROCKSDB_LITE
void BuildSubcompactionJobInfo(
SubcompactionJobInfo& subcompaction_job_info) const {
const Compaction* c = compaction;
const ColumnFamilyData* cfd = c->column_family_data();
subcompaction_job_info.cf_id = cfd->GetID();
subcompaction_job_info.cf_name = cfd->GetName();
subcompaction_job_info.status = status;
subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
subcompaction_job_info.base_input_level = c->start_level();
subcompaction_job_info.output_level = c->output_level();
subcompaction_job_info.stats = compaction_job_stats;
}
#endif // !ROCKSDB_LITE
SubcompactionState() = delete;
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint32_t _sub_job_id)
: compaction(c),
start(_start),
end(_end),
sub_job_id(_sub_job_id),
compaction_outputs_(c, /*is_penultimate_level=*/false),
penultimate_level_outputs_(c, /*is_penultimate_level=*/true) {
assert(compaction != nullptr);
const InternalKeyComparator* icmp =
&compaction->column_family_data()->internal_comparator();
const InternalKey* output_split_key = compaction->GetOutputSplitKey();
// Invalid output_split_key indicates that we do not need to split
if (output_split_key != nullptr) {
// We may only split the output when the cursor is in the range. Split
if ((end == nullptr || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()),
ExtractUserKey(*end)) < 0) &&
(start == nullptr || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()),
ExtractUserKey(*start)) > 0)) {
local_output_split_key_ = output_split_key;
}
}
}
SubcompactionState(SubcompactionState&& state) noexcept
: compaction(state.compaction),
start(state.start),
end(state.end),
status(std::move(state.status)),
io_status(std::move(state.io_status)),
notify_on_subcompaction_completion(
state.notify_on_subcompaction_completion),
compaction_job_stats(std::move(state.compaction_job_stats)),
sub_job_id(state.sub_job_id),
files_to_cut_for_ttl_(std::move(state.files_to_cut_for_ttl_)),
cur_files_to_cut_for_ttl_(state.cur_files_to_cut_for_ttl_),
next_files_to_cut_for_ttl_(state.next_files_to_cut_for_ttl_),
grandparent_index_(state.grandparent_index_),
overlapped_bytes_(state.overlapped_bytes_),
seen_key_(state.seen_key_),
compaction_outputs_(std::move(state.compaction_outputs_)),
penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)),
is_current_penultimate_level_(state.is_current_penultimate_level_),
has_penultimate_level_outputs_(state.has_penultimate_level_outputs_) {
current_outputs_ = is_current_penultimate_level_
? &penultimate_level_outputs_
: &compaction_outputs_;
}
bool HasPenultimateLevelOutputs() const {
return has_penultimate_level_outputs_ ||
penultimate_level_outputs_.HasRangeDel();
}
void FillFilesToCutForTtl();
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
bool IsCurrentPenultimateLevel() const {
return is_current_penultimate_level_;
}
// Add all the new files from this compaction to version_edit
void AddOutputsEdit(VersionEdit* out_edit) const {
for (const auto& file : penultimate_level_outputs_.outputs_) {
out_edit->AddFile(compaction->GetPenultimateLevel(), file.meta);
}
for (const auto& file : compaction_outputs_.outputs_) {
out_edit->AddFile(compaction->output_level(), file.meta);
}
}
void Cleanup(Cache* cache);
void AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats) const;
CompactionOutputs& Current() const {
assert(current_outputs_);
return *current_outputs_;
}
// Add compaction_iterator key/value to the `Current` output group.
Status AddToOutput(const CompactionIterator& iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func);
// Close all compaction output files, both output_to_penultimate_level outputs
// and normal outputs.
Status CloseCompactionFiles(const Status& curr_status,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
// close the output file.
Status s = penultimate_level_outputs_.CloseOutput(
curr_status, open_file_func, close_file_func);
s = compaction_outputs_.CloseOutput(s, open_file_func, close_file_func);
return s;
}
private:
// Some identified files with old oldest ancester time and the range should be
// isolated out so that the output file(s) in that range can be merged down
// for TTL and clear the timestamps for the range.
std::vector<FileMetaData*> files_to_cut_for_ttl_;
int cur_files_to_cut_for_ttl_ = -1;
int next_files_to_cut_for_ttl_ = 0;
// An index that used to speed up ShouldStopBefore().
size_t grandparent_index_ = 0;
// The number of bytes overlapping between the current output and
// grandparent files used in ShouldStopBefore().
uint64_t overlapped_bytes_ = 0;
// A flag determines whether the key has been seen in ShouldStopBefore()
bool seen_key_ = false;
// A flag determines if this subcompaction has been split by the cursor
bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid
// repetitive comparison in ShouldStopBefore()
const InternalKey* local_output_split_key_ = nullptr;
// State kept for output being generated
CompactionOutputs compaction_outputs_;
CompactionOutputs penultimate_level_outputs_;
CompactionOutputs* current_outputs_ = &compaction_outputs_;
bool is_current_penultimate_level_ = false;
bool has_penultimate_level_outputs_ = false;
};
} // namespace ROCKSDB_NAMESPACE

File diff suppressed because it is too large Load Diff

@ -30,10 +30,100 @@ namespace ROCKSDB_NAMESPACE {
// SYNC_POINT is not supported in released Windows mode.
#if !defined(ROCKSDB_LITE)
class CompactionStatsCollector : public EventListener {
public:
CompactionStatsCollector()
: compaction_completed_(
static_cast<int>(CompactionReason::kNumOfReasons)) {
for (auto& v : compaction_completed_) {
v.store(0);
}
}
~CompactionStatsCollector() override {}
void OnCompactionCompleted(DB* /* db */,
const CompactionJobInfo& info) override {
int k = static_cast<int>(info.compaction_reason);
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
assert(k >= 0 && k < num_of_reasons);
compaction_completed_[k]++;
}
void OnExternalFileIngested(
DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
compaction_completed_[k]++;
}
void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kFlush);
compaction_completed_[k]++;
}
int NumberOfCompactions(CompactionReason reason) const {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
int k = static_cast<int>(reason);
assert(k >= 0 && k < num_of_reasons);
return compaction_completed_.at(k).load();
}
private:
std::vector<std::atomic<int>> compaction_completed_;
};
class DBCompactionTest : public DBTestBase {
public:
DBCompactionTest()
: DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {}
protected:
#ifndef ROCKSDB_LITE
uint64_t GetSstSizeHelper(Temperature temperature) {
std::string prop;
EXPECT_TRUE(dbfull()->GetProperty(
DB::Properties::kLiveSstFilesSizeAtTemperature +
std::to_string(static_cast<uint8_t>(temperature)),
&prop));
return static_cast<uint64_t>(std::atoi(prop.c_str()));
}
#endif // ROCKSDB_LITE
/*
* Verifies compaction stats of cfd are valid.
*
* For each level of cfd, its compaction stats are valid if
* 1) sum(stat.counts) == stat.count, and
* 2) stat.counts[i] == collector.NumberOfCompactions(i)
*/
void VerifyCompactionStats(ColumnFamilyData& cfd,
const CompactionStatsCollector& collector) {
#ifndef NDEBUG
InternalStats* internal_stats_ptr = cfd.internal_stats();
ASSERT_NE(internal_stats_ptr, nullptr);
const std::vector<InternalStats::CompactionStats>& comp_stats =
internal_stats_ptr->TEST_GetCompactionStats();
const int num_of_reasons =
static_cast<int>(CompactionReason::kNumOfReasons);
std::vector<int> counts(num_of_reasons, 0);
// Count the number of compactions caused by each CompactionReason across
// all levels.
for (const auto& stat : comp_stats) {
int sum = 0;
for (int i = 0; i < num_of_reasons; i++) {
counts[i] += stat.counts[i];
sum += stat.counts[i];
}
ASSERT_EQ(sum, stat.count);
}
// Verify InternalStats bookkeeping matches that of
// CompactionStatsCollector, assuming that all compactions complete.
for (int i = 0; i < num_of_reasons; i++) {
ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)),
counts[i]);
}
#endif /* NDEBUG */
}
};
class DBCompactionTestWithParam
@ -110,47 +200,6 @@ class FlushedFileCollector : public EventListener {
std::mutex mutex_;
};
class CompactionStatsCollector : public EventListener {
public:
CompactionStatsCollector()
: compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
for (auto& v : compaction_completed_) {
v.store(0);
}
}
~CompactionStatsCollector() override {}
void OnCompactionCompleted(DB* /* db */,
const CompactionJobInfo& info) override {
int k = static_cast<int>(info.compaction_reason);
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
assert(k >= 0 && k < num_of_reasons);
compaction_completed_[k]++;
}
void OnExternalFileIngested(
DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
compaction_completed_[k]++;
}
void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kFlush);
compaction_completed_[k]++;
}
int NumberOfCompactions(CompactionReason reason) const {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
int k = static_cast<int>(reason);
assert(k >= 0 && k < num_of_reasons);
return compaction_completed_.at(k).load();
}
private:
std::vector<std::atomic<int>> compaction_completed_;
};
class SstStatsCollector : public EventListener {
public:
SstStatsCollector() : num_ssts_creation_started_(0) {}
@ -247,40 +296,6 @@ void VerifyCompactionResult(
#endif
}
/*
* Verifies compaction stats of cfd are valid.
*
* For each level of cfd, its compaction stats are valid if
* 1) sum(stat.counts) == stat.count, and
* 2) stat.counts[i] == collector.NumberOfCompactions(i)
*/
void VerifyCompactionStats(ColumnFamilyData& cfd,
const CompactionStatsCollector& collector) {
#ifndef NDEBUG
InternalStats* internal_stats_ptr = cfd.internal_stats();
ASSERT_NE(internal_stats_ptr, nullptr);
const std::vector<InternalStats::CompactionStats>& comp_stats =
internal_stats_ptr->TEST_GetCompactionStats();
const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
std::vector<int> counts(num_of_reasons, 0);
// Count the number of compactions caused by each CompactionReason across
// all levels.
for (const auto& stat : comp_stats) {
int sum = 0;
for (int i = 0; i < num_of_reasons; i++) {
counts[i] += stat.counts[i];
sum += stat.counts[i];
}
ASSERT_EQ(sum, stat.count);
}
// Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
// assuming that all compactions complete.
for (int i = 0; i < num_of_reasons; i++) {
ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
}
#endif /* NDEBUG */
}
const SstFileMetaData* PickFileRandomly(
const ColumnFamilyMetaData& cf_meta,
Random* rand,

@ -20,8 +20,6 @@
#include "rocksdb/system_clock.h"
#include "util/hash_containers.h"
class ColumnFamilyData;
namespace ROCKSDB_NAMESPACE {
template <class Stats>
@ -140,6 +138,23 @@ class InternalStats {
InternalStats(int num_levels, SystemClock* clock, ColumnFamilyData* cfd);
// Per level compaction stats
struct CompactionOutputsStats {
uint64_t num_output_records = 0;
uint64_t bytes_written = 0;
uint64_t bytes_written_blob = 0;
uint64_t num_output_files = 0;
uint64_t num_output_files_blob = 0;
void Add(const CompactionOutputsStats& stats) {
this->num_output_records += stats.num_output_records;
this->bytes_written += stats.bytes_written;
this->bytes_written_blob += stats.bytes_written_blob;
this->num_output_files += stats.num_output_files;
this->num_output_files_blob += stats.num_output_files_blob;
}
};
// Per level compaction stats. comp_stats_[level] stores the stats for
// compactions that produced data for the specified "level".
struct CompactionStats {
@ -184,11 +199,14 @@ class InternalStats {
// (num input entries - num output entries) for compaction levels N and N+1
uint64_t num_dropped_records;
// Total output entries from compaction
uint64_t num_output_records;
// Number of compactions done
int count;
// Number of compactions done per CompactionReason
int counts[static_cast<int>(CompactionReason::kNumOfReasons)];
int counts[static_cast<int>(CompactionReason::kNumOfReasons)]{};
explicit CompactionStats()
: micros(0),
@ -205,6 +223,7 @@ class InternalStats {
num_output_files_blob(0),
num_input_records(0),
num_dropped_records(0),
num_output_records(0),
count(0) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
@ -227,6 +246,7 @@ class InternalStats {
num_output_files_blob(0),
num_input_records(0),
num_dropped_records(0),
num_output_records(0),
count(c) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
@ -240,7 +260,7 @@ class InternalStats {
}
}
explicit CompactionStats(const CompactionStats& c)
CompactionStats(const CompactionStats& c)
: micros(c.micros),
cpu_micros(c.cpu_micros),
bytes_read_non_output_levels(c.bytes_read_non_output_levels),
@ -256,6 +276,7 @@ class InternalStats {
num_output_files_blob(c.num_output_files_blob),
num_input_records(c.num_input_records),
num_dropped_records(c.num_dropped_records),
num_output_records(c.num_output_records),
count(c.count) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
@ -279,6 +300,7 @@ class InternalStats {
num_output_files_blob = c.num_output_files_blob;
num_input_records = c.num_input_records;
num_dropped_records = c.num_dropped_records;
num_output_records = c.num_output_records;
count = c.count;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
@ -303,6 +325,7 @@ class InternalStats {
this->num_output_files_blob = 0;
this->num_input_records = 0;
this->num_dropped_records = 0;
this->num_output_records = 0;
this->count = 0;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
@ -327,6 +350,7 @@ class InternalStats {
this->num_output_files_blob += c.num_output_files_blob;
this->num_input_records += c.num_input_records;
this->num_dropped_records += c.num_dropped_records;
this->num_output_records += c.num_output_records;
this->count += c.count;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i< num_of_reasons; i++) {
@ -334,6 +358,15 @@ class InternalStats {
}
}
void Add(const CompactionOutputsStats& stats) {
this->num_output_files += static_cast<int>(stats.num_output_files);
this->num_output_records += stats.num_output_records;
this->bytes_written += stats.bytes_written;
this->bytes_written_blob += stats.bytes_written_blob;
this->num_output_files_blob +=
static_cast<int>(stats.num_output_files_blob);
}
void Subtract(const CompactionStats& c) {
this->micros -= c.micros;
this->cpu_micros -= c.cpu_micros;
@ -351,12 +384,70 @@ class InternalStats {
this->num_output_files_blob -= c.num_output_files_blob;
this->num_input_records -= c.num_input_records;
this->num_dropped_records -= c.num_dropped_records;
this->num_output_records -= c.num_output_records;
this->count -= c.count;
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
for (int i = 0; i < num_of_reasons; i++) {
counts[i] -= c.counts[i];
}
}
void ResetCompactionReason(CompactionReason reason) {
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
assert(count == 1); // only support update one compaction reason
for (int i = 0; i < num_of_reasons; i++) {
counts[i] = 0;
}
int r = static_cast<int>(reason);
assert(r >= 0 && r < num_of_reasons);
counts[r] = 1;
}
};
// Compaction stats, for per_key_placement compaction, it includes 2 levels
// stats: the last level and the penultimate level.
struct CompactionStatsFull {
// the stats for the target primary output level
CompactionStats stats;
// stats for penultimate level output if exist
bool has_penultimate_level_output = false;
CompactionStats penultimate_level_stats;
explicit CompactionStatsFull() : stats(), penultimate_level_stats() {}
explicit CompactionStatsFull(CompactionReason reason, int c)
: stats(reason, c), penultimate_level_stats(reason, c){};
uint64_t TotalBytesWritten() const {
uint64_t bytes_written = stats.bytes_written + stats.bytes_written_blob;
if (has_penultimate_level_output) {
bytes_written += penultimate_level_stats.bytes_written +
penultimate_level_stats.bytes_written_blob;
}
return bytes_written;
}
uint64_t DroppedRecords() {
uint64_t output_records = stats.num_output_records;
if (has_penultimate_level_output) {
output_records += penultimate_level_stats.num_output_records;
}
if (stats.num_input_records > output_records) {
return stats.num_input_records - output_records;
}
return 0;
}
void SetMicros(uint64_t val) {
stats.micros = val;
penultimate_level_stats.micros = val;
}
void AddCpuMicros(uint64_t val) {
stats.cpu_micros += val;
penultimate_level_stats.cpu_micros += val;
}
};
// For use with CacheEntryStatsCollector
@ -403,6 +494,7 @@ class InternalStats {
for (auto& comp_stat : comp_stats_) {
comp_stat.Clear();
}
per_key_placement_comp_stats_.Clear();
for (auto& h : file_read_latency_) {
h.Clear();
}
@ -419,6 +511,15 @@ class InternalStats {
comp_stats_by_pri_[thread_pri].Add(stats);
}
void AddCompactionStats(int level, Env::Priority thread_pri,
const CompactionStatsFull& comp_stats_full) {
AddCompactionStats(level, thread_pri, comp_stats_full.stats);
if (comp_stats_full.has_penultimate_level_output) {
per_key_placement_comp_stats_.Add(
comp_stats_full.penultimate_level_stats);
}
}
void IncBytesMoved(int level, uint64_t amount) {
comp_stats_[level].bytes_moved += amount;
}
@ -479,6 +580,10 @@ class InternalStats {
return comp_stats_;
}
const CompactionStats& TEST_GetPerKeyPlacementCompactionStats() const {
return per_key_placement_comp_stats_;
}
void TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats, bool foreground);
// Store a mapping from the user-facing DB::Properties string to our
@ -518,6 +623,7 @@ class InternalStats {
// Per-ColumnFamily/level compaction stats
std::vector<CompactionStats> comp_stats_;
std::vector<CompactionStats> comp_stats_by_pri_;
CompactionStats per_key_placement_comp_stats_;
std::vector<HistogramImpl> file_read_latency_;
HistogramImpl blob_file_read_latency_;
@ -749,6 +855,23 @@ class InternalStats {
InternalStats(int /*num_levels*/, SystemClock* /*clock*/,
ColumnFamilyData* /*cfd*/) {}
// Per level compaction stats
struct CompactionOutputsStats {
uint64_t num_output_records = 0;
uint64_t bytes_written = 0;
uint64_t bytes_written_blob = 0;
uint64_t num_output_files = 0;
uint64_t num_output_files_blob = 0;
void Add(const CompactionOutputsStats& stats) {
this->num_output_records += stats.num_output_records;
this->bytes_written += stats.bytes_written;
this->bytes_written_blob += stats.bytes_written_blob;
this->num_output_files += stats.num_output_files;
this->num_output_files_blob += stats.num_output_files_blob;
}
};
struct CompactionStats {
uint64_t micros;
uint64_t cpu_micros;
@ -764,6 +887,7 @@ class InternalStats {
int num_output_files_blob;
uint64_t num_input_records;
uint64_t num_dropped_records;
uint64_t num_output_records;
int count;
explicit CompactionStats() {}
@ -774,12 +898,38 @@ class InternalStats {
void Add(const CompactionStats& /*c*/) {}
void Add(const CompactionOutputsStats& /*c*/) {}
void Subtract(const CompactionStats& /*c*/) {}
};
struct CompactionStatsFull {
// the stats for the target primary output level (per level stats)
CompactionStats stats;
// stats for output_to_penultimate_level level (per level stats)
bool has_penultimate_level_output = false;
CompactionStats penultimate_level_stats;
explicit CompactionStatsFull(){};
explicit CompactionStatsFull(CompactionReason /*reason*/, int /*c*/){};
uint64_t TotalBytesWritten() const { return 0; }
uint64_t DroppedRecords() { return 0; }
void SetMicros(uint64_t /*val*/){};
void AddCpuMicros(uint64_t /*val*/){};
};
void AddCompactionStats(int /*level*/, Env::Priority /*thread_pri*/,
const CompactionStats& /*stats*/) {}
void AddCompactionStats(int /*level*/, Env::Priority /*thread_pri*/,
const CompactionStatsFull& /*unmerged_stats*/) {}
void IncBytesMoved(int /*level*/, uint64_t /*amount*/) {}
void AddCFStats(InternalCFStatsType /*type*/, uint64_t /*value*/) {}

@ -102,5 +102,7 @@ struct CompactionJobStats {
// number of single-deletes which meet something other than a put
uint64_t num_single_del_mismatch;
// TODO: Add output_to_penultimate_level output information
};
} // namespace ROCKSDB_NAMESPACE

@ -33,7 +33,11 @@ LIB_SOURCES = \
db/compaction/compaction_picker_fifo.cc \
db/compaction/compaction_picker_level.cc \
db/compaction/compaction_picker_universal.cc \
db/compaction/compaction_service_job.cc \
db/compaction/compaction_state.cc \
db/compaction/compaction_outputs.cc \
db/compaction/sst_partitioner.cc \
db/compaction/subcompaction_state.cc \
db/convenience.cc \
db/db_filesnapshot.cc \
db/db_impl/compacted_db_impl.cc \
@ -433,6 +437,7 @@ TEST_MAIN_SOURCES = \
db/compaction/compaction_job_stats_test.cc \
db/compaction/compaction_picker_test.cc \
db/compaction/compaction_service_test.cc \
db/compaction/tiered_compaction_test.cc \
db/comparator_db_test.cc \
db/corruption_test.cc \
db/cuckoo_table_db_test.cc \

Loading…
Cancel
Save