Add sample_for_compression results to table properties (#8139)

Summary:
Added `TableProperties::{fast,slow}_compression_estimated_data_size`.
These properties are present in block-based tables when
`ColumnFamilyOptions::sample_for_compression > 0` and the necessary
compression library is supported when the file is generated. They
contain estimates of what `TableProperties::data_size` would be if the
"fast"/"slow" compression library had been used instead. One
limitation is we do not record exactly which "fast" (ZSTD or Zlib)
or "slow" (LZ4 or Snappy) compression library produced the result.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8139

Test Plan:
- new unit test
- ran `db_bench` with `sample_for_compression=1`; verified the `data_size` property matches the `{slow,fast}_compression_estimated_data_size` when the same compression type is used for the output file compression and the sampled compression

Reviewed By: riversand963

Differential Revision: D27454338

Pulled By: ajkr

fbshipit-source-id: 9529293de93ddac7f03b2e149d746e9f634abac4
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent a781b103da
commit 1ba2b8a568
  1. 3
      HISTORY.md
  2. 74
      db/db_properties_test.cc
  3. 8
      db/event_helpers.cc
  4. 10
      include/rocksdb/table_properties.h
  5. 29
      java/rocksjni/portal.h
  6. 2
      java/rocksjni/testable_event_listener.cc
  7. 43
      java/src/main/java/org/rocksdb/TableProperties.java
  8. 7
      java/src/test/java/org/rocksdb/EventListenerTest.java
  9. 53
      table/block_based/block_based_table_builder.cc
  10. 12
      table/meta_blocks.cc
  11. 17
      table/table_properties.cc

@ -12,6 +12,9 @@
### Performance Improvements
* On ARM platform, use `yield` instead of `wfe` to relax cpu to gain better performance.
### Public API change
* Added `TableProperties::slow_compression_estimated_data_size` and `TableProperties::fast_compression_estimated_data_size`. When `ColumnFamilyOptions::sample_for_compression > 0`, they estimate what `TableProperties::data_size` would have been if the "fast" or "slow" (see `ColumnFamilyOptions::sample_for_compression` API doc for definitions) compression had been used instead.
## 6.19.0 (03/21/2021)
### Bug Fixes
* Fixed the truncation error found in APIs/tools when dumping block-based SST files in a human-readable format. After fix, the block-based table can be fully dumped as a readable file.

@ -1520,6 +1520,80 @@ TEST_F(DBPropertiesTest, BlockAddForCompressionSampling) {
}
}
class CompressionSamplingDBPropertiesTest
: public DBPropertiesTest,
public ::testing::WithParamInterface<bool> {
public:
CompressionSamplingDBPropertiesTest() : fast_(GetParam()) {}
protected:
const bool fast_;
};
INSTANTIATE_TEST_CASE_P(CompressionSamplingDBPropertiesTest,
CompressionSamplingDBPropertiesTest, ::testing::Bool());
// Excluded from RocksDB lite tests due to `GetPropertiesOfAllTables()` usage.
TEST_P(CompressionSamplingDBPropertiesTest,
EstimateDataSizeWithCompressionSampling) {
Options options = CurrentOptions();
if (fast_) {
// One of the following light compression libraries must be present.
if (LZ4_Supported()) {
options.compression = kLZ4Compression;
} else if (Snappy_Supported()) {
options.compression = kSnappyCompression;
} else {
return;
}
} else {
// One of the following heavy compression libraries must be present.
if (ZSTD_Supported()) {
options.compression = kZSTD;
} else if (Zlib_Supported()) {
options.compression = kZlibCompression;
} else {
return;
}
}
options.disable_auto_compactions = true;
// For simplicity/determinism, sample 100%.
options.sample_for_compression = 1;
Reopen(options);
// Setup the following LSM:
//
// L0_0 ["a", "b"]
// L1_0 ["a", "b"]
//
// L0_0 was created by flush. L1_0 was created by compaction. Each file
// contains one data block. The value consists of compressible data so the
// data block should be stored compressed.
std::string val(1024, 'a');
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("a", val));
ASSERT_OK(Put("b", val));
ASSERT_OK(Flush());
if (i == 1) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
}
TablePropertiesCollection file_to_props;
ASSERT_OK(db_->GetPropertiesOfAllTables(&file_to_props));
ASSERT_EQ(2, file_to_props.size());
for (const auto& file_and_props : file_to_props) {
ASSERT_GT(file_and_props.second->data_size, 0);
if (fast_) {
ASSERT_EQ(file_and_props.second->data_size,
file_and_props.second->fast_compression_estimated_data_size);
} else {
ASSERT_EQ(file_and_props.second->data_size,
file_and_props.second->slow_compression_estimated_data_size);
}
}
}
TEST_F(DBPropertiesTest, EstimateNumKeysUnderflow) {
Options options = CurrentOptions();
Reopen(options);

@ -125,8 +125,12 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished(
<< table_properties.compression_options << "creation_time"
<< table_properties.creation_time << "oldest_key_time"
<< table_properties.oldest_key_time << "file_creation_time"
<< table_properties.file_creation_time << "db_id"
<< table_properties.db_id << "db_session_id"
<< table_properties.file_creation_time
<< "slow_compression_estimated_data_size"
<< table_properties.slow_compression_estimated_data_size
<< "fast_compression_estimated_data_size"
<< table_properties.fast_compression_estimated_data_size
<< "db_id" << table_properties.db_id << "db_session_id"
<< table_properties.db_session_id;
// user collected properties

@ -61,6 +61,8 @@ struct TablePropertiesNames {
static const std::string kCreationTime;
static const std::string kOldestKeyTime;
static const std::string kFileCreationTime;
static const std::string kSlowCompressionEstimatedDataSize;
static const std::string kFastCompressionEstimatedDataSize;
};
extern const std::string kPropertiesBlock;
@ -195,6 +197,14 @@ struct TableProperties {
uint64_t oldest_key_time = 0;
// Actual SST file creation time. 0 means unknown.
uint64_t file_creation_time = 0;
// Estimated size of data blocks if compressed using a relatively slower
// compression algorithm (see `ColumnFamilyOptions::sample_for_compression`).
// 0 means unknown.
uint64_t slow_compression_estimated_data_size = 0;
// Estimated size of data blocks if compressed using a relatively faster
// compression algorithm (see `ColumnFamilyOptions::sample_for_compression`).
// 0 means unknown.
uint64_t fast_compression_estimated_data_size = 0;
// DB identity
// db_id is an identifier generated the first time the DB is created

@ -6075,7 +6075,11 @@ class TablePropertiesJni : public JavaClass {
return nullptr;
}
jmethodID mid = env->GetMethodID(jclazz, "<init>", "(JJJJJJJJJJJJJJJJJJJ[BLjava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)V");
jmethodID mid = env->GetMethodID(
jclazz, "<init>",
"(JJJJJJJJJJJJJJJJJJJJJ[BLjava/lang/String;Ljava/lang/String;Ljava/"
"lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/"
"String;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)V");
if (mid == nullptr) {
// exception thrown: NoSuchMethodException or OutOfMemoryError
return nullptr;
@ -6201,8 +6205,8 @@ class TablePropertiesJni : public JavaClass {
return nullptr;
}
jobject jtable_properties = env->NewObject(jclazz, mid,
static_cast<jlong>(table_properties.data_size),
jobject jtable_properties = env->NewObject(
jclazz, mid, static_cast<jlong>(table_properties.data_size),
static_cast<jlong>(table_properties.index_size),
static_cast<jlong>(table_properties.index_partitions),
static_cast<jlong>(table_properties.top_level_index_size),
@ -6221,17 +6225,14 @@ class TablePropertiesJni : public JavaClass {
static_cast<jlong>(table_properties.column_family_id),
static_cast<jlong>(table_properties.creation_time),
static_cast<jlong>(table_properties.oldest_key_time),
jcolumn_family_name,
jfilter_policy_name,
jcomparator_name,
jmerge_operator_name,
jprefix_extractor_name,
jproperty_collectors_names,
jcompression_name,
juser_collected_properties,
jreadable_properties,
jproperties_offsets
);
static_cast<jlong>(
table_properties.slow_compression_estimated_data_size),
static_cast<jlong>(
table_properties.fast_compression_estimated_data_size),
jcolumn_family_name, jfilter_policy_name, jcomparator_name,
jmerge_operator_name, jprefix_extractor_name,
jproperty_collectors_names, jcompression_name,
juser_collected_properties, jreadable_properties, jproperties_offsets);
if (env->ExceptionCheck()) {
return nullptr;

@ -35,6 +35,8 @@ static TableProperties newTablePropertiesForTest() {
table_properties.creation_time = UINT64_MAX;
table_properties.oldest_key_time = UINT64_MAX;
table_properties.file_creation_time = UINT64_MAX;
table_properties.slow_compression_estimated_data_size = UINT64_MAX;
table_properties.fast_compression_estimated_data_size = UINT64_MAX;
table_properties.db_id = "dbId";
table_properties.db_session_id = "sessionId";
table_properties.column_family_name = "columnFamilyName";

@ -29,6 +29,8 @@ public class TableProperties {
private final long columnFamilyId;
private final long creationTime;
private final long oldestKeyTime;
private final long slowCompressionEstimatedDataSize;
private final long fastCompressionEstimatedDataSize;
private final byte[] columnFamilyName;
private final String filterPolicyName;
private final String comparatorName;
@ -50,10 +52,12 @@ public class TableProperties {
final long rawValueSize, final long numDataBlocks, final long numEntries,
final long numDeletions, final long numMergeOperands, final long numRangeDeletions,
final long formatVersion, final long fixedKeyLen, final long columnFamilyId,
final long creationTime, final long oldestKeyTime, final byte[] columnFamilyName,
final String filterPolicyName, final String comparatorName, final String mergeOperatorName,
final String prefixExtractorName, final String propertyCollectorsNames,
final String compressionName, final Map<String, String> userCollectedProperties,
final long creationTime, final long oldestKeyTime,
final long slowCompressionEstimatedDataSize, final long fastCompressionEstimatedDataSize,
final byte[] columnFamilyName, final String filterPolicyName, final String comparatorName,
final String mergeOperatorName, final String prefixExtractorName,
final String propertyCollectorsNames, final String compressionName,
final Map<String, String> userCollectedProperties,
final Map<String, String> readableProperties, final Map<String, Long> propertiesOffsets) {
this.dataSize = dataSize;
this.indexSize = indexSize;
@ -74,6 +78,8 @@ public class TableProperties {
this.columnFamilyId = columnFamilyId;
this.creationTime = creationTime;
this.oldestKeyTime = oldestKeyTime;
this.slowCompressionEstimatedDataSize = slowCompressionEstimatedDataSize;
this.fastCompressionEstimatedDataSize = fastCompressionEstimatedDataSize;
this.columnFamilyName = columnFamilyName;
this.filterPolicyName = filterPolicyName;
this.comparatorName = comparatorName;
@ -266,6 +272,26 @@ public class TableProperties {
return oldestKeyTime;
}
/**
* Get the estimated size of data blocks compressed with a relatively slower
* compression algorithm.
*
* @return 0 means unknown, otherwise the timestamp.
*/
public long getSlowCompressionEstimatedDataSize() {
return slowCompressionEstimatedDataSize;
}
/**
* Get the estimated size of data blocks compressed with a relatively faster
* compression algorithm.
*
* @return 0 means unknown, otherwise the timestamp.
*/
public long getFastCompressionEstimatedDataSize() {
return fastCompressionEstimatedDataSize;
}
/**
* Get the name of the column family with which this
* SST file is associated.
@ -380,6 +406,8 @@ public class TableProperties {
&& formatVersion == that.formatVersion && fixedKeyLen == that.fixedKeyLen
&& columnFamilyId == that.columnFamilyId && creationTime == that.creationTime
&& oldestKeyTime == that.oldestKeyTime
&& slowCompressionEstimatedDataSize == that.slowCompressionEstimatedDataSize
&& fastCompressionEstimatedDataSize == that.fastCompressionEstimatedDataSize
&& Arrays.equals(columnFamilyName, that.columnFamilyName)
&& Objects.equals(filterPolicyName, that.filterPolicyName)
&& Objects.equals(comparatorName, that.comparatorName)
@ -397,9 +425,10 @@ public class TableProperties {
int result = Objects.hash(dataSize, indexSize, indexPartitions, topLevelIndexSize,
indexKeyIsUserKey, indexValueIsDeltaEncoded, filterSize, rawKeySize, rawValueSize,
numDataBlocks, numEntries, numDeletions, numMergeOperands, numRangeDeletions, formatVersion,
fixedKeyLen, columnFamilyId, creationTime, oldestKeyTime, filterPolicyName, comparatorName,
mergeOperatorName, prefixExtractorName, propertyCollectorsNames, compressionName,
userCollectedProperties, readableProperties, propertiesOffsets);
fixedKeyLen, columnFamilyId, creationTime, oldestKeyTime, slowCompressionEstimatedDataSize,
fastCompressionEstimatedDataSize, filterPolicyName, comparatorName, mergeOperatorName,
prefixExtractorName, propertyCollectorsNames, compressionName, userCollectedProperties,
readableProperties, propertiesOffsets);
result = 31 * result + Arrays.hashCode(columnFamilyName);
return result;
}

@ -244,9 +244,10 @@ public class EventListenerTest {
TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL,
TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL,
TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL, TEST_LONG_VAL,
"columnFamilyName".getBytes(), "filterPolicyName", "comparatorName", "mergeOperatorName",
"prefixExtractorName", "propertyCollectorsNames", "compressionName",
userCollectedPropertiesTestData, readablePropertiesTestData, propertiesOffsetsTestData);
TEST_LONG_VAL, TEST_LONG_VAL, "columnFamilyName".getBytes(), "filterPolicyName",
"comparatorName", "mergeOperatorName", "prefixExtractorName", "propertyCollectorsNames",
"compressionName", userCollectedPropertiesTestData, readablePropertiesTestData,
propertiesOffsetsTestData);
final FlushJobInfo flushJobInfoTestData = new FlushJobInfo(Integer.MAX_VALUE,
"testColumnFamily", "/file/path", TEST_LONG_VAL, Integer.MAX_VALUE, true, true,
TEST_LONG_VAL, TEST_LONG_VAL, tablePropertiesTestData, (byte) 0x0a);

@ -271,6 +271,11 @@ struct BlockBasedTableBuilder::Rep {
const Slice* first_key_in_next_block = nullptr;
CompressionType compression_type;
uint64_t sample_for_compression;
std::atomic<uint64_t> compressible_input_data_bytes;
std::atomic<uint64_t> uncompressible_input_data_bytes;
std::atomic<uint64_t> sampled_input_data_bytes;
std::atomic<uint64_t> sampled_output_slow_data_bytes;
std::atomic<uint64_t> sampled_output_fast_data_bytes;
CompressionOptions compression_opts;
std::unique_ptr<CompressionDict> compression_dict;
std::vector<std::unique_ptr<CompressionContext>> compression_ctxs;
@ -431,6 +436,11 @@ struct BlockBasedTableBuilder::Rep {
internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_type(_compression_type),
sample_for_compression(_moptions.sample_for_compression),
compressible_input_data_bytes(0),
uncompressible_input_data_bytes(0),
sampled_input_data_bytes(0),
sampled_output_slow_data_bytes(0),
sampled_output_fast_data_bytes(0),
compression_opts(_compression_opts),
compression_dict(),
compression_ctxs(_compression_opts.parallel_threads),
@ -1085,6 +1095,10 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
if (is_data_block) {
r->compressible_input_data_bytes.fetch_add(raw_block_contents.size(),
std::memory_order_relaxed);
}
const CompressionDict* compression_dict;
if (!is_data_block || r->compression_dict == nullptr) {
compression_dict = &CompressionDict::GetEmptyDict();
@ -1103,6 +1117,16 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
r->table_options.format_version, is_data_block /* do_sample */,
compressed_output, &sampled_output_fast, &sampled_output_slow);
if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
// Currently compression sampling is only enabled for data block.
assert(is_data_block);
r->sampled_input_data_bytes.fetch_add(raw_block_contents.size(),
std::memory_order_relaxed);
r->sampled_output_slow_data_bytes.fetch_add(sampled_output_slow.size(),
std::memory_order_relaxed);
r->sampled_output_fast_data_bytes.fetch_add(sampled_output_fast.size(),
std::memory_order_relaxed);
}
// notify collectors on block add
NotifyCollectTableCollectorsOnBlockAdd(
r->table_properties_collectors, raw_block_contents.size(),
@ -1146,8 +1170,16 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
}
} else {
// Block is too big to be compressed.
if (is_data_block) {
r->uncompressible_input_data_bytes.fetch_add(raw_block_contents.size(),
std::memory_order_relaxed);
}
abort_compression = true;
}
if (is_data_block) {
r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
std::memory_order_relaxed);
}
// Abort compression if the block is too big, or did not pass
// verification.
@ -1539,6 +1571,27 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
rep_->props.creation_time = rep_->creation_time;
rep_->props.oldest_key_time = rep_->oldest_key_time;
rep_->props.file_creation_time = rep_->file_creation_time;
if (rep_->sampled_input_data_bytes > 0) {
rep_->props.slow_compression_estimated_data_size = static_cast<uint64_t>(
static_cast<double>(rep_->sampled_output_slow_data_bytes) /
rep_->sampled_input_data_bytes *
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes + 0.5);
rep_->props.fast_compression_estimated_data_size = static_cast<uint64_t>(
static_cast<double>(rep_->sampled_output_fast_data_bytes) /
rep_->sampled_input_data_bytes *
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes + 0.5);
} else if (rep_->sample_for_compression > 0) {
// We tried to sample but none were found. Assume worst-case (compression
// ratio 1.0) so data is complete and aggregatable.
rep_->props.slow_compression_estimated_data_size =
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes;
rep_->props.fast_compression_estimated_data_size =
rep_->compressible_input_data_bytes +
rep_->uncompressible_input_data_bytes;
}
rep_->props.db_id = rep_->db_id;
rep_->props.db_session_id = rep_->db_session_id;
rep_->props.db_host_id = rep_->db_host_id;

@ -96,6 +96,14 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
if (props.file_creation_time > 0) {
Add(TablePropertiesNames::kFileCreationTime, props.file_creation_time);
}
if (props.slow_compression_estimated_data_size > 0) {
Add(TablePropertiesNames::kSlowCompressionEstimatedDataSize,
props.slow_compression_estimated_data_size);
}
if (props.fast_compression_estimated_data_size > 0) {
Add(TablePropertiesNames::kFastCompressionEstimatedDataSize,
props.fast_compression_estimated_data_size);
}
if (!props.db_id.empty()) {
Add(TablePropertiesNames::kDbId, props.db_id);
}
@ -279,6 +287,10 @@ Status ReadProperties(const ReadOptions& read_options,
&new_table_properties->oldest_key_time},
{TablePropertiesNames::kFileCreationTime,
&new_table_properties->file_creation_time},
{TablePropertiesNames::kSlowCompressionEstimatedDataSize,
&new_table_properties->slow_compression_estimated_data_size},
{TablePropertiesNames::kFastCompressionEstimatedDataSize,
&new_table_properties->fast_compression_estimated_data_size},
};
std::string last_key;

@ -168,6 +168,11 @@ std::string TableProperties::ToString(
AppendProperty(result, "file creation time", file_creation_time, prop_delim,
kv_delim);
AppendProperty(result, "slow compression estimated data size",
slow_compression_estimated_data_size, prop_delim, kv_delim);
AppendProperty(result, "fast compression estimated data size",
fast_compression_estimated_data_size, prop_delim, kv_delim);
// DB identity and DB session ID
AppendProperty(result, "DB identity", db_id, prop_delim, kv_delim);
AppendProperty(result, "DB session identity", db_session_id, prop_delim,
@ -191,6 +196,10 @@ void TableProperties::Add(const TableProperties& tp) {
num_deletions += tp.num_deletions;
num_merge_operands += tp.num_merge_operands;
num_range_deletions += tp.num_range_deletions;
slow_compression_estimated_data_size +=
tp.slow_compression_estimated_data_size;
fast_compression_estimated_data_size +=
tp.fast_compression_estimated_data_size;
}
std::map<std::string, uint64_t>
@ -208,6 +217,10 @@ TableProperties::GetAggregatablePropertiesAsMap() const {
rv["num_deletions"] = num_deletions;
rv["num_merge_operands"] = num_merge_operands;
rv["num_range_deletions"] = num_range_deletions;
rv["slow_compression_estimated_data_size"] =
slow_compression_estimated_data_size;
rv["fast_compression_estimated_data_size"] =
fast_compression_estimated_data_size;
return rv;
}
@ -268,6 +281,10 @@ const std::string TablePropertiesNames::kOldestKeyTime =
"rocksdb.oldest.key.time";
const std::string TablePropertiesNames::kFileCreationTime =
"rocksdb.file.creation.time";
const std::string TablePropertiesNames::kSlowCompressionEstimatedDataSize =
"rocksdb.sample_for_compression.slow.data.size";
const std::string TablePropertiesNames::kFastCompressionEstimatedDataSize =
"rocksdb.sample_for_compression.fast.data.size";
extern const std::string kPropertiesBlock = "rocksdb.properties";
// Old property block name for backward compatibility

Loading…
Cancel
Save