diff --git a/CMakeLists.txt b/CMakeLists.txt index 6401ab105..4432b264a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -562,6 +562,7 @@ set(SOURCES db/compaction/compaction_picker_fifo.cc db/compaction/compaction_picker_level.cc db/compaction/compaction_picker_universal.cc + db/compaction/sst_partitioner.cc db/convenience.cc db/db_filesnapshot.cc db/db_impl/db_impl.cc diff --git a/HISTORY.md b/HISTORY.md index 53571470c..9791bfb9b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -30,6 +30,7 @@ * `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kOptionalChecksumAndDbSessionId`. By default, `BackupableDBOptions::share_files_with_checksum_naming` is set to `kOptionalChecksumAndDbSessionId`. In the default case, backup table filenames generated by this version of RocksDB are of the form either `__.sst` or `_.sst` as opposed to `__.sst`. Specifically, table filenames are of the form `__.sst` if `DBOptions::file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`. Futhermore, the checksum value `` appeared in the filenames is hexadecimal-encoded, instead of being decimal-encoded `uint32_t` value. If `DBOptions::file_checksum_gen_factory` is `nullptr`, the table filenames are of the form `_.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. Moreover, for table files generated prior to this version of RocksDB, using `kOptionalChecksumAndDbSessionId` will fall back on `kChecksumAndFileSize`. In these cases, the checksum value `` in the filenames `__.sst` is decimal-encoded `uint32_t` value as before. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true. * Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes. * Option `max_subcompactions` can be set dynamically using DB::SetDBOptions(). +* Added experimental ColumnFamilyOptions::sst_partitioner_factory to define determine the partitioning of sst files. This helps compaction to split the files on interesting boundaries (key prefixes) to make propagation of sst files less write amplifying (covering the whole key space). ### Bug Fixes * Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. diff --git a/TARGETS b/TARGETS index 19a9c4651..090dfdb32 100644 --- a/TARGETS +++ b/TARGETS @@ -138,6 +138,7 @@ cpp_library( "db/compaction/compaction_picker_fifo.cc", "db/compaction/compaction_picker_level.cc", "db/compaction/compaction_picker_universal.cc", + "db/compaction/sst_partitioner.cc", "db/convenience.cc", "db/db_filesnapshot.cc", "db/db_impl/db_impl.cc", diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 4312431d6..60e2681fa 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -7,12 +7,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/compaction/compaction.h" + #include #include #include "db/column_family.h" -#include "db/compaction/compaction.h" #include "rocksdb/compaction_filter.h" +#include "rocksdb/sst_partitioner.h" #include "test_util/sync_point.h" #include "util/string_util.h" @@ -329,6 +331,8 @@ bool Compaction::IsTrivialMove() const { // assert inputs_.size() == 1 + std::unique_ptr partitioner = CreateSstPartitioner(); + for (const auto& file : inputs_.front().files) { std::vector file_grand_parents; if (output_level_ + 1 >= number_levels_) { @@ -341,6 +345,13 @@ bool Compaction::IsTrivialMove() const { if (compaction_size > max_compaction_bytes_) { return false; } + + if (partitioner.get() != nullptr) { + if (!partitioner->CanDoTrivialMove(file->smallest.user_key(), + file->largest.user_key())) { + return false; + } + } } return true; @@ -526,6 +537,21 @@ std::unique_ptr Compaction::CreateCompactionFilter() const { context); } +std::unique_ptr Compaction::CreateSstPartitioner() const { + if (!immutable_cf_options_.sst_partitioner_factory) { + return nullptr; + } + + SstPartitioner::Context context; + context.is_full_compaction = is_full_compaction_; + context.is_manual_compaction = is_manual_compaction_; + context.output_level = output_level_; + context.smallest_user_key = smallest_user_key_; + context.largest_user_key = largest_user_key_; + return immutable_cf_options_.sst_partitioner_factory->CreatePartitioner( + context); +} + bool Compaction::IsOutputLevelEmpty() const { return inputs_.back().level != output_level_ || inputs_.back().empty(); } diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 55d4fee59..d25ffd603 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -11,6 +11,7 @@ #include "db/version_set.h" #include "memory/arena.h" #include "options/cf_options.h" +#include "rocksdb/sst_partitioner.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -256,6 +257,9 @@ class Compaction { // Create a CompactionFilter from compaction_filter_factory std::unique_ptr CreateCompactionFilter() const; + // Create a SstPartitioner from sst_partitioner_factory + std::unique_ptr CreateSstPartitioner() const; + // Is the input level corresponding to output_level_ empty? bool IsOutputLevelEmpty() const; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 13654ef36..49b94ba64 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -46,6 +46,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/sst_partitioner.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -949,6 +950,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } const auto& c_iter_stats = c_iter->iter_stats(); + std::unique_ptr partitioner = + sub_compact->compaction->output_level() == 0 + ? nullptr + : sub_compact->compaction->CreateSstPartitioner(); + std::string last_key_for_partitioner; + while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. @@ -1006,20 +1013,29 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { "CompactionJob::Run():PausingManualCompaction:2", reinterpret_cast( const_cast*>(manual_compaction_paused_))); + if (partitioner.get()) { + last_key_for_partitioner.assign(c_iter->user_key().data_, + c_iter->user_key().size_); + } c_iter->Next(); if (c_iter->status().IsManualCompactionPaused()) { break; } - if (!output_file_ended && c_iter->Valid() && - sub_compact->compaction->output_level() != 0 && - sub_compact->ShouldStopBefore(c_iter->key(), - sub_compact->current_output_file_size) && - sub_compact->builder != nullptr) { - // (2) this key belongs to the next file. For historical reasons, the - // iterator status after advancing will be given to - // FinishCompactionOutputFile(). - input_status = input->status(); - output_file_ended = true; + if (!output_file_ended && c_iter->Valid()) { + if (((partitioner.get() && + partitioner->ShouldPartition(PartitionerRequest( + last_key_for_partitioner, c_iter->user_key(), + sub_compact->current_output_file_size)) == kRequired) || + (sub_compact->compaction->output_level() != 0 && + sub_compact->ShouldStopBefore( + c_iter->key(), sub_compact->current_output_file_size))) && + sub_compact->builder != nullptr) { + // (2) this key belongs to the next file. For historical reasons, the + // iterator status after advancing will be given to + // FinishCompactionOutputFile(). + input_status = input->status(); + output_file_ended = true; + } } if (output_file_ended) { const Slice* next_key = nullptr; diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 2f9919f7d..44721021d 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -1665,6 +1665,32 @@ TEST_F(CompactionPickerTest, IsTrivialMoveOn) { ASSERT_TRUE(compaction->IsTrivialMove()); } +TEST_F(CompactionPickerTest, IsTrivialMoveOffSstPartitioned) { + mutable_cf_options_.max_bytes_for_level_base = 10000u; + mutable_cf_options_.max_compaction_bytes = 10001u; + ioptions_.level_compaction_dynamic_level_bytes = false; + ioptions_.sst_partitioner_factory = NewSstPartitionerFixedPrefixFactory(1); + NewVersionStorage(6, kCompactionStyleLevel); + // A compaction should be triggered and pick file 2 + Add(1, 1U, "100", "150", 3000U); + Add(1, 2U, "151", "200", 3001U); + Add(1, 3U, "201", "250", 3000U); + Add(1, 4U, "251", "300", 3000U); + + Add(3, 5U, "120", "130", 7000U); + Add(3, 6U, "170", "180", 7000U); + Add(3, 7U, "220", "230", 7000U); + Add(3, 8U, "270", "280", 7000U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + // No trivial move, because partitioning is applied + ASSERT_TRUE(!compaction->IsTrivialMove()); +} + TEST_F(CompactionPickerTest, IsTrivialMoveOff) { mutable_cf_options_.max_bytes_for_level_base = 1000000u; mutable_cf_options_.max_compaction_bytes = 10000u; diff --git a/db/compaction/sst_partitioner.cc b/db/compaction/sst_partitioner.cc new file mode 100644 index 000000000..1faa25707 --- /dev/null +++ b/db/compaction/sst_partitioner.cc @@ -0,0 +1,44 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "rocksdb/sst_partitioner.h" + +#include + +namespace ROCKSDB_NAMESPACE { + +PartitionerResult SstPartitionerFixedPrefix::ShouldPartition( + const PartitionerRequest& request) { + Slice last_key_fixed(*request.prev_user_key); + if (last_key_fixed.size() > len_) { + last_key_fixed.size_ = len_; + } + Slice current_key_fixed(*request.current_user_key); + if (current_key_fixed.size() > len_) { + current_key_fixed.size_ = len_; + } + return last_key_fixed.compare(current_key_fixed) != 0 ? kRequired + : kNotRequired; +} + +bool SstPartitionerFixedPrefix::CanDoTrivialMove( + const Slice& smallest_user_key, const Slice& largest_user_key) { + return ShouldPartition(PartitionerRequest(smallest_user_key, largest_user_key, + 0)) == kNotRequired; +} + +std::unique_ptr +SstPartitionerFixedPrefixFactory::CreatePartitioner( + const SstPartitioner::Context& /* context */) const { + return std::unique_ptr(new SstPartitionerFixedPrefix(len_)); +} + +std::shared_ptr NewSstPartitionerFixedPrefixFactory( + size_t prefix_len) { + return std::make_shared(prefix_len); +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 3be2edc3a..8441950ec 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -977,6 +977,60 @@ TEST_F(DBCompactionTest, UserKeyCrossFile2) { ASSERT_EQ("NOT_FOUND", Get("3")); } +TEST_F(DBCompactionTest, CompactionSstPartitioner) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.level0_file_num_compaction_trigger = 3; + std::shared_ptr factory( + NewSstPartitionerFixedPrefixFactory(4)); + options.sst_partitioner_factory = factory; + + DestroyAndReopen(options); + + // create first file and flush to l0 + Put("aaaa1", "A"); + Put("bbbb1", "B"); + Flush(); + dbfull()->TEST_WaitForFlushMemTable(); + + Put("aaaa1", "A2"); + Flush(); + dbfull()->TEST_WaitForFlushMemTable(); + + // move both files down to l1 + dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); + + std::vector files; + dbfull()->GetLiveFilesMetaData(&files); + ASSERT_EQ(2, files.size()); + ASSERT_EQ("A2", Get("aaaa1")); + ASSERT_EQ("B", Get("bbbb1")); +} + +TEST_F(DBCompactionTest, CompactionSstPartitionerNonTrivial) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.level0_file_num_compaction_trigger = 1; + std::shared_ptr factory( + NewSstPartitionerFixedPrefixFactory(4)); + options.sst_partitioner_factory = factory; + + DestroyAndReopen(options); + + // create first file and flush to l0 + Put("aaaa1", "A"); + Put("bbbb1", "B"); + Flush(); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(true); + + std::vector files; + dbfull()->GetLiveFilesMetaData(&files); + ASSERT_EQ(2, files.size()); + ASSERT_EQ("A", Get("aaaa1")); + ASSERT_EQ("B", Get("bbbb1")); +} + TEST_F(DBCompactionTest, ZeroSeqIdCompaction) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleLevel; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 61deac42f..5c75bd0d1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -10,6 +10,7 @@ #include #include + #include #include #include @@ -21,6 +22,7 @@ #include "rocksdb/env.h" #include "rocksdb/file_checksum.h" #include "rocksdb/listener.h" +#include "rocksdb/sst_partitioner.h" #include "rocksdb/universal_compaction.h" #include "rocksdb/version.h" #include "rocksdb/write_buffer_manager.h" @@ -308,6 +310,15 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Default: nullptr std::shared_ptr compaction_thread_limiter = nullptr; + // If non-nullptr, use the specified factory for a function to determine the + // partitioning of sst files. This helps compaction to split the files + // on interesting boundaries (key prefixes) to make propagation of sst + // files less write amplifying (covering the whole key space). + // THE FEATURE IS STILL EXPERIMENTAL + // + // Default: nullptr + std::shared_ptr sst_partitioner_factory = nullptr; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/include/rocksdb/sst_partitioner.h b/include/rocksdb/sst_partitioner.h new file mode 100644 index 000000000..5d181958f --- /dev/null +++ b/include/rocksdb/sst_partitioner.h @@ -0,0 +1,135 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#pragma once + +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { + +class Slice; + +enum PartitionerResult : char { + // Partitioner does not require to create new file + kNotRequired = 0x0, + // Partitioner is requesting forcefully to create new file + kRequired = 0x1 + // Additional constants can be added +}; + +struct PartitionerRequest { + PartitionerRequest(const Slice& prev_user_key_, + const Slice& current_user_key_, + uint64_t current_output_file_size_) + : prev_user_key(&prev_user_key_), + current_user_key(¤t_user_key_), + current_output_file_size(current_output_file_size_) {} + const Slice* prev_user_key; + const Slice* current_user_key; + uint64_t current_output_file_size; +}; + +/* + * A SstPartitioner is a generic pluggable way of defining the partition + * of SST files. Compaction job will split the SST files on partition boundary + * to lower the write amplification during SST file promote to higher level. + */ +class SstPartitioner { + public: + virtual ~SstPartitioner() {} + + // Return the name of this partitioner. + virtual const char* Name() const = 0; + + // It is called for all keys in compaction. When partitioner want to create + // new SST file it needs to return true. It means compaction job will finish + // current SST file where last key is "prev_user_key" parameter and start new + // SST file where first key is "current_user_key". Returns decission if + // partition boundary was detected and compaction should create new file. + virtual PartitionerResult ShouldPartition( + const PartitionerRequest& request) = 0; + + // Called with smallest and largest keys in SST file when compation try to do + // trivial move. Returns true is partitioner allows to do trivial move. + virtual bool CanDoTrivialMove(const Slice& smallest_user_key, + const Slice& largest_user_key) = 0; + + // Context information of a compaction run + struct Context { + // Does this compaction run include all data files + bool is_full_compaction; + // Is this compaction requested by the client (true), + // or is it occurring as an automatic compaction process + bool is_manual_compaction; + // Output level for this compaction + int output_level; + // Smallest key for compaction + Slice smallest_user_key; + // Largest key for compaction + Slice largest_user_key; + }; +}; + +class SstPartitionerFactory { + public: + virtual ~SstPartitionerFactory() {} + + virtual std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& context) const = 0; + + // Returns a name that identifies this partitioner factory. + virtual const char* Name() const = 0; +}; + +/* + * Fixed key prefix partitioner. It splits the output SST files when prefix + * defined by size changes. + */ +class SstPartitionerFixedPrefix : public SstPartitioner { + public: + explicit SstPartitionerFixedPrefix(size_t len) : len_(len) {} + + virtual ~SstPartitionerFixedPrefix() override {} + + const char* Name() const override { return "SstPartitionerFixedPrefix"; } + + PartitionerResult ShouldPartition(const PartitionerRequest& request) override; + + bool CanDoTrivialMove(const Slice& smallest_user_key, + const Slice& largest_user_key) override; + + private: + size_t len_; +}; + +/* + * Factory for fixed prefix partitioner. + */ +class SstPartitionerFixedPrefixFactory : public SstPartitionerFactory { + public: + explicit SstPartitionerFixedPrefixFactory(size_t len) : len_(len) {} + + virtual ~SstPartitionerFixedPrefixFactory() {} + + const char* Name() const override { + return "SstPartitionerFixedPrefixFactory"; + } + + std::unique_ptr CreatePartitioner( + const SstPartitioner::Context& /* context */) const override; + + private: + size_t len_; +}; + +extern std::shared_ptr +NewSstPartitionerFixedPrefixFactory(size_t prefix_len); + +} // namespace ROCKSDB_NAMESPACE diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 71745d259..cedf8910d 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -56,6 +56,7 @@ set(JNI_NATIVE_SOURCES rocksjni/sst_file_writerjni.cc rocksjni/sst_file_readerjni.cc rocksjni/sst_file_reader_iterator.cc + rocksjni/sst_partitioner.cc rocksjni/statistics.cc rocksjni/statisticsjni.cc rocksjni/table.cc @@ -201,9 +202,11 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/Snapshot.java src/main/java/org/rocksdb/SstFileManager.java src/main/java/org/rocksdb/SstFileMetaData.java - src/main/java/org/rocksdb/SstFileWriter.java src/main/java/org/rocksdb/SstFileReader.java src/main/java/org/rocksdb/SstFileReaderIterator.java + src/main/java/org/rocksdb/SstFileWriter.java + src/main/java/org/rocksdb/SstPartitionerFactory.java + src/main/java/org/rocksdb/SstPartitionerFixedPrefixFactory.java src/main/java/org/rocksdb/StateType.java src/main/java/org/rocksdb/StatisticsCollectorCallback.java src/main/java/org/rocksdb/StatisticsCollector.java @@ -452,6 +455,8 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7" org.rocksdb.SstFileWriter org.rocksdb.SstFileReader org.rocksdb.SstFileReaderIterator + org.rocksdb.SstPartitionerFactory + org.rocksdb.SstPartitionerFixedPrefixFactory org.rocksdb.Statistics org.rocksdb.StringAppendOperator org.rocksdb.TableFormatConfig diff --git a/java/Makefile b/java/Makefile index c233f4f59..8b2dfba08 100644 --- a/java/Makefile +++ b/java/Makefile @@ -63,6 +63,8 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.SstFileWriter\ org.rocksdb.SstFileReader\ org.rocksdb.SstFileReaderIterator\ + org.rocksdb.SstPartitionerFactory\ + org.rocksdb.SstPartitionerFixedPrefixFactory\ org.rocksdb.Statistics\ org.rocksdb.ThreadStatus\ org.rocksdb.TimedEnv\ @@ -165,6 +167,7 @@ JAVA_TESTS = \ org.rocksdb.SstFileManagerTest\ org.rocksdb.SstFileWriterTest\ org.rocksdb.SstFileReaderTest\ + org.rocksdb.SstPartitionerTest\ org.rocksdb.TableFilterTest\ org.rocksdb.TimedEnvTest\ org.rocksdb.TransactionTest\ diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index dbc932279..4979621a2 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -6,9 +6,12 @@ // This file implements the "bridge" between Java and C++ for // ROCKSDB_NAMESPACE::Options. +#include "rocksdb/options.h" + #include #include #include + #include #include @@ -19,22 +22,20 @@ #include "include/org_rocksdb_Options.h" #include "include/org_rocksdb_ReadOptions.h" #include "include/org_rocksdb_WriteOptions.h" - -#include "rocksjni/comparatorjnicallback.h" -#include "rocksjni/portal.h" -#include "rocksjni/statisticsjni.h" -#include "rocksjni/table_filter_jnicallback.h" - #include "rocksdb/comparator.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" #include "rocksdb/merge_operator.h" -#include "rocksdb/options.h" #include "rocksdb/rate_limiter.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/sst_partitioner.h" #include "rocksdb/statistics.h" #include "rocksdb/table.h" +#include "rocksjni/comparatorjnicallback.h" +#include "rocksjni/portal.h" +#include "rocksjni/statisticsjni.h" +#include "rocksjni/table_filter_jnicallback.h" #include "utilities/merge_operators.h" /* @@ -1130,6 +1131,20 @@ void Java_org_rocksdb_Options_setTableFactory( options->table_factory.reset(table_factory); } +/* + * Method: setSstPartitionerFactory + * Signature: (JJ)V + */ +void Java_org_rocksdb_Options_setSstPartitionerFactory(JNIEnv*, jobject, + jlong jhandle, + jlong factory_handle) { + auto* options = reinterpret_cast(jhandle); + auto factory = reinterpret_cast< + std::shared_ptr*>( + factory_handle); + options->sst_partitioner_factory = *factory; +} + /* * Class: org_rocksdb_Options * Method: allowMmapReads @@ -3621,6 +3636,19 @@ void Java_org_rocksdb_ColumnFamilyOptions_setTableFactory( reinterpret_cast(jfactory_handle)); } +/* + * Method: setSstPartitionerFactory + * Signature: (JJ)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setSstPartitionerFactory( + JNIEnv*, jobject, jlong jhandle, jlong factory_handle) { + auto* options = + reinterpret_cast(jhandle); + auto* factory = reinterpret_cast( + factory_handle); + options->sst_partitioner_factory.reset(factory); +} + /* * Method: tableFactoryName * Signature: (J)Ljava/lang/String diff --git a/java/rocksjni/sst_partitioner.cc b/java/rocksjni/sst_partitioner.cc new file mode 100644 index 000000000..92c0ebdda --- /dev/null +++ b/java/rocksjni/sst_partitioner.cc @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the "bridge" between Java and C++ and enables +// calling C++ ROCKSDB_NAMESPACE::SstFileManager methods +// from Java side. + +#include "rocksdb/sst_partitioner.h" + +#include + +#include + +#include "include/org_rocksdb_SstPartitionerFixedPrefixFactory.h" +#include "rocksdb/sst_file_manager.h" +#include "rocksjni/portal.h" + +/* + * Class: org_rocksdb_SstPartitionerFixedPrefixFactory + * Method: newSstPartitionerFixedPrefixFactory0 + * Signature: (J)J + */ +jlong Java_org_rocksdb_SstPartitionerFixedPrefixFactory_newSstPartitionerFixedPrefixFactory0( + JNIEnv*, jclass, jlong prefix_len) { + auto* ptr = new std::shared_ptr( + ROCKSDB_NAMESPACE::NewSstPartitionerFixedPrefixFactory(prefix_len)); + return reinterpret_cast(ptr); +} + +/* + * Class: org_rocksdb_SstPartitionerFixedPrefixFactory + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_SstPartitionerFixedPrefixFactory_disposeInternal( + JNIEnv*, jobject, jlong jhandle) { + auto* ptr = reinterpret_cast< + std::shared_ptr*>(jhandle); + delete ptr; // delete std::shared_ptr +} diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index f7852d37d..53fc1af6e 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -844,6 +844,18 @@ public class ColumnFamilyOptions extends RocksObject return forceConsistencyChecks(nativeHandle_); } + @Override + public ColumnFamilyOptions setSstPartitionerFactory(SstPartitionerFactory sstPartitionerFactory) { + setSstPartitionerFactory(nativeHandle_, sstPartitionerFactory.nativeHandle_); + this.sstPartitionerFactory_ = sstPartitionerFactory; + return this; + } + + @Override + public SstPartitionerFactory sstPartitionerFactory() { + return sstPartitionerFactory_; + } + private static native long getColumnFamilyOptionsFromProps( final long cfgHandle, String optString); private static native long getColumnFamilyOptionsFromProps(final String optString); @@ -1005,6 +1017,7 @@ public class ColumnFamilyOptions extends RocksObject private native void setForceConsistencyChecks(final long handle, final boolean forceConsistencyChecks); private native boolean forceConsistencyChecks(final long handle); + private native void setSstPartitionerFactory(long nativeHandle_, long newFactoryHandle); // instance variables // NOTE: If you add new member variables, please update the copy constructor above! @@ -1018,5 +1031,5 @@ public class ColumnFamilyOptions extends RocksObject private CompactionOptionsFIFO compactionOptionsFIFO_; private CompressionOptions bottommostCompressionOptions_; private CompressionOptions compressionOptions_; - + private SstPartitionerFactory sstPartitionerFactory_; } diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java index b02c6c236..0a8224e19 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java @@ -437,6 +437,23 @@ public interface ColumnFamilyOptionsInterface metadata = db.getLiveFilesMetaData(); + assertThat(metadata.size()).isEqualTo(2); + } + } +} diff --git a/options/cf_options.cc b/options/cf_options.cc index 9e1bbcb11..c39b420aa 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -763,7 +763,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, cf_options.memtable_insert_with_hint_prefix_extractor.get()), cf_paths(cf_options.cf_paths), compaction_thread_limiter(cf_options.compaction_thread_limiter), - file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()) {} + file_checksum_gen_factory(db_options.file_checksum_gen_factory.get()), + sst_partitioner_factory(cf_options.sst_partitioner_factory) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/options/cf_options.h b/options/cf_options.h index 4c24dd706..4aab909ea 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -119,6 +119,8 @@ struct ImmutableCFOptions { std::shared_ptr compaction_thread_limiter; FileChecksumGenFactory* file_checksum_gen_factory; + + std::shared_ptr sst_partitioner_factory; }; struct MutableCFOptions { diff --git a/options/options.cc b/options/options.cc index f9983d069..599886d4e 100644 --- a/options/options.cc +++ b/options/options.cc @@ -24,6 +24,7 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/sst_file_manager.h" +#include "rocksdb/sst_partitioner.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "rocksdb/wal_filter.h" @@ -122,6 +123,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER( log, " Options.compaction_filter_factory: %s", compaction_filter_factory ? compaction_filter_factory->Name() : "None"); + ROCKS_LOG_HEADER( + log, " Options.sst_partitioner_factory: %s", + sst_partitioner_factory ? sst_partitioner_factory->Name() : "None"); ROCKS_LOG_HEADER(log, " Options.memtable_factory: %s", memtable_factory->Name()); ROCKS_LOG_HEADER(log, " Options.table_factory: %s", diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index a87fd1e6d..ff945694f 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -387,6 +387,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { {offset_of(&ColumnFamilyOptions::cf_paths), sizeof(std::vector)}, {offset_of(&ColumnFamilyOptions::compaction_thread_limiter), sizeof(std::shared_ptr)}, + {offset_of(&ColumnFamilyOptions::sst_partitioner_factory), + sizeof(std::shared_ptr)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; @@ -425,6 +427,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { options->purge_redundant_kvs_while_flush = false; options->max_mem_compaction_level = 0; options->compaction_filter = nullptr; + options->sst_partitioner_factory = nullptr; char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)]; ColumnFamilyOptions* new_options = diff --git a/src.mk b/src.mk index a8592ff3a..0656b6f0c 100644 --- a/src.mk +++ b/src.mk @@ -22,6 +22,7 @@ LIB_SOURCES = \ db/compaction/compaction_picker_fifo.cc \ db/compaction/compaction_picker_level.cc \ db/compaction/compaction_picker_universal.cc \ + db/compaction/sst_partitioner.cc \ db/convenience.cc \ db/db_filesnapshot.cc \ db/db_impl/db_impl.cc \ @@ -554,6 +555,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/sst_file_writerjni.cc \ java/rocksjni/sst_file_readerjni.cc \ java/rocksjni/sst_file_reader_iterator.cc \ + java/rocksjni/sst_partitioner.cc \ java/rocksjni/statistics.cc \ java/rocksjni/statisticsjni.cc \ java/rocksjni/table.cc \