New BlockBasedTable version -- better compressed block format

Summary:
This diff adds BlockBasedTable format_version = 2. New format version brings better compressed block format for these compressions:
1) Zlib -- encode decompressed size in compressed block header
2) BZip2 -- encode decompressed size in compressed block header
3) LZ4 and LZ4HC -- instead of doing memcpy of size_t encode size as varint32. memcpy is very bad because the DB is not portable accross big/little endian machines or even platforms where size_t might be 8 or 4 bytes.

It does not affect format for snappy.

If you write a new database with format_version = 2, it will not be readable by RocksDB versions before 3.10. DB::Open() will return corruption in that case.

Test Plan:
Added a new test in db_test.
I will also run db_bench and verify VSIZE when block_cache == 1GB

Reviewers: yhchiang, rven, MarkCallaghan, dhruba, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31461
main
Igor Canadi 10 years ago
parent 2355931c6f
commit 9ab5adfc59
  1. 2
      HISTORY.md
  2. 41
      db/db_bench.cc
  3. 50
      db/db_test.cc
  4. 2
      include/rocksdb/version.h
  5. 30
      table/block_based_table_builder.cc
  6. 8
      table/block_based_table_factory.cc
  7. 24
      table/block_based_table_reader.cc
  8. 4
      table/block_based_table_reader.h
  9. 24
      table/format.cc
  10. 20
      table/format.h
  11. 37
      table/table_test.cc
  12. 22
      tools/db_sanity_test.cc
  13. 314
      util/compression.h

@ -8,6 +8,8 @@
Lower numbered levels will be placed earlier in the db_paths and higher Lower numbered levels will be placed earlier in the db_paths and higher
numbered levels will be placed later in the db_paths vector. numbered levels will be placed later in the db_paths vector.
* Potentially big performance improvements if you're using RocksDB with lots of column families (100-1000) * Potentially big performance improvements if you're using RocksDB with lots of column families (100-1000)
* Added BlockBasedTableOptions.format_version option, which allows user to specify which version of block based table he wants. As a general guidline, newer versions have more features, but might not be readable by older versions of RocksDB.
* Added new block based table format (version 2), which you can enable by setting BlockBasedTableOptions.format_version = 2. This format changes how we encode size information in compressed blocks and should help with memory allocations if you're using Zlib or BZip2 compressions.
### Public API changes ### Public API changes
* Deprecated skip_log_error_on_recovery option * Deprecated skip_log_error_on_recovery option

@ -1219,22 +1219,22 @@ class Benchmark {
name = "Snappy"; name = "Snappy";
break; break;
case kZlibCompression: case kZlibCompression:
result = Zlib_Compress(Options().compression_opts, text, strlen(text), result = Zlib_Compress(Options().compression_opts, 2, text,
&compressed); strlen(text), &compressed);
name = "Zlib"; name = "Zlib";
break; break;
case kBZip2Compression: case kBZip2Compression:
result = BZip2_Compress(Options().compression_opts, text, result = BZip2_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed); strlen(text), &compressed);
name = "BZip2"; name = "BZip2";
break; break;
case kLZ4Compression: case kLZ4Compression:
result = LZ4_Compress(Options().compression_opts, text, strlen(text), result = LZ4_Compress(Options().compression_opts, 2, text,
&compressed); strlen(text), &compressed);
name = "LZ4"; name = "LZ4";
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
result = LZ4HC_Compress(Options().compression_opts, text, result = LZ4HC_Compress(Options().compression_opts, 2, text,
strlen(text), &compressed); strlen(text), &compressed);
name = "LZ4HC"; name = "LZ4HC";
break; break;
@ -1779,19 +1779,19 @@ class Benchmark {
input.size(), &compressed); input.size(), &compressed);
break; break;
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, input.data(), ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed); input.size(), &compressed);
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, input.data(), ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed); input.size(), &compressed);
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, input.data(), ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed); input.size(), &compressed);
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, input.data(), ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed); input.size(), &compressed);
break; break;
default: default:
@ -1825,19 +1825,19 @@ class Benchmark {
input.size(), &compressed); input.size(), &compressed);
break; break;
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, input.data(), input.size(), ok = Zlib_Compress(Options().compression_opts, 2, input.data(),
&compressed); input.size(), &compressed);
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, input.data(), ok = BZip2_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed); input.size(), &compressed);
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, input.data(), input.size(), ok = LZ4_Compress(Options().compression_opts, 2, input.data(),
&compressed); input.size(), &compressed);
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, input.data(), ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), &compressed); input.size(), &compressed);
break; break;
default: default:
@ -1857,22 +1857,22 @@ class Benchmark {
break; break;
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(), uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(),
&decompress_size); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(), uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
&decompress_size); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(),
&decompress_size); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
default: default:
@ -2031,6 +2031,7 @@ class Benchmark {
block_based_options.block_size = FLAGS_block_size; block_based_options.block_size = FLAGS_block_size;
block_based_options.block_restart_interval = FLAGS_block_restart_interval; block_based_options.block_restart_interval = FLAGS_block_restart_interval;
block_based_options.filter_policy = filter_policy_; block_based_options.filter_policy = filter_policy_;
block_based_options.format_version = 2;
options.table_factory.reset( options.table_factory.reset(
NewBlockBasedTableFactory(block_based_options)); NewBlockBasedTableFactory(block_based_options));
} }

@ -65,25 +65,25 @@ static bool SnappyCompressionSupported(const CompressionOptions& options) {
static bool ZlibCompressionSupported(const CompressionOptions& options) { static bool ZlibCompressionSupported(const CompressionOptions& options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return Zlib_Compress(options, in.data(), in.size(), &out); return Zlib_Compress(options, 2, in.data(), in.size(), &out);
} }
static bool BZip2CompressionSupported(const CompressionOptions& options) { static bool BZip2CompressionSupported(const CompressionOptions& options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return BZip2_Compress(options, in.data(), in.size(), &out); return BZip2_Compress(options, 2, in.data(), in.size(), &out);
} }
static bool LZ4CompressionSupported(const CompressionOptions &options) { static bool LZ4CompressionSupported(const CompressionOptions &options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return LZ4_Compress(options, in.data(), in.size(), &out); return LZ4_Compress(options, 2, in.data(), in.size(), &out);
} }
static bool LZ4HCCompressionSupported(const CompressionOptions &options) { static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return LZ4HC_Compress(options, in.data(), in.size(), &out); return LZ4HC_Compress(options, 2, in.data(), in.size(), &out);
} }
static std::string RandomString(Random* rnd, int len) { static std::string RandomString(Random* rnd, int len) {
@ -10170,6 +10170,48 @@ TEST(DBTest, DontDeleteMovedFile) {
Reopen(options); Reopen(options);
} }
TEST(DBTest, EncodeDecompressedBlockSizeTest) {
// iter 0 -- zlib
// iter 1 -- bzip2
// iter 2 -- lz4
// iter 3 -- lz4HC
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression};
for (int iter = 0; iter < 4; ++iter) {
// first_table_version 1 -- generate with table_version == 1, read with
// table_version == 2
// first_table_version 2 -- generate with table_version == 2, read with
// table_version == 1
for (int first_table_version = 1; first_table_version <= 2;
++first_table_version) {
BlockBasedTableOptions table_options;
table_options.format_version = first_table_version;
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.create_if_missing = true;
options.compression = compressions[iter];
DestroyAndReopen(options);
int kNumKeysWritten = 100000;
Random rnd(301);
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), RandomString(&rnd, 128) + std::string(128, 'a')));
}
table_options.format_version = first_table_version == 1 ? 2 : 1;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
ASSERT_EQ(r.substr(128), std::string(128, 'a'));
}
}
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -5,7 +5,7 @@
#pragma once #pragma once
#define ROCKSDB_MAJOR 3 #define ROCKSDB_MAJOR 3
#define ROCKSDB_MINOR 9 #define ROCKSDB_MINOR 10
#define ROCKSDB_PATCH 0 #define ROCKSDB_PATCH 0
// Do not use these. We made the mistake of declaring macros starting with // Do not use these. We made the mistake of declaring macros starting with

@ -302,9 +302,11 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u); return compressed_size < raw_size - (raw_size / 8u);
} }
// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options, const CompressionOptions& compression_options,
CompressionType* type, std::string* compressed_output) { CompressionType* type, uint32_t format_version,
std::string* compressed_output) {
if (*type == kNoCompression) { if (*type == kNoCompression) {
return raw; return raw;
} }
@ -320,29 +322,37 @@ Slice CompressBlock(const Slice& raw,
} }
break; // fall back to no compression. break; // fall back to no compression.
case kZlibCompression: case kZlibCompression:
if (Zlib_Compress(compression_options, raw.data(), raw.size(), if (Zlib_Compress(
compressed_output) && compression_options,
GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
break; // fall back to no compression. break; // fall back to no compression.
case kBZip2Compression: case kBZip2Compression:
if (BZip2_Compress(compression_options, raw.data(), raw.size(), if (BZip2_Compress(
compressed_output) && compression_options,
GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
break; // fall back to no compression. break; // fall back to no compression.
case kLZ4Compression: case kLZ4Compression:
if (LZ4_Compress(compression_options, raw.data(), raw.size(), if (LZ4_Compress(
compressed_output) && compression_options,
GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
break; // fall back to no compression. break; // fall back to no compression.
case kLZ4HCCompression: case kLZ4HCCompression:
if (LZ4HC_Compress(compression_options, raw.data(), raw.size(), if (LZ4HC_Compress(
compressed_output) && compression_options,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
@ -579,7 +589,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (raw_block_contents.size() < kCompressionSizeLimit) { if (raw_block_contents.size() < kCompressionSizeLimit) {
block_contents = block_contents =
CompressBlock(raw_block_contents, r->compression_opts, &type, CompressBlock(raw_block_contents, r->compression_opts, &type,
&r->compressed_output); r->table_options.format_version, &r->compressed_output);
} else { } else {
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
type = kNoCompression; type = kNoCompression;

@ -14,11 +14,12 @@
#include <string> #include <string>
#include <stdint.h> #include <stdint.h>
#include "port/port.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "table/block_based_table_builder.h" #include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h" #include "table/block_based_table_reader.h"
#include "port/port.h" #include "table/format.h"
namespace rocksdb { namespace rocksdb {
@ -76,9 +77,10 @@ Status BlockBasedTableFactory::SanitizeOptions(
return Status::InvalidArgument("Enable cache_index_and_filter_blocks, " return Status::InvalidArgument("Enable cache_index_and_filter_blocks, "
", but block cache is disabled"); ", but block cache is disabled");
} }
if (table_options_.format_version > 1) { if (!BlockBasedTableSupportedVersion(table_options_.format_version)) {
return Status::InvalidArgument( return Status::InvalidArgument(
"We currently only support versions 0 and 1"); "Unsupported BlockBasedTable format_version. Please check "
"include/rocksdb/table.h for more info");
} }
return Status::OK(); return Status::OK();
} }

@ -442,9 +442,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (footer.version() > 1) { if (!BlockBasedTableSupportedVersion(footer.version())) {
return Status::Corruption( return Status::Corruption(
"Unknown Footer version. Maybe this file was created with too new " "Unknown Footer version. Maybe this file was created with newer "
"version of RocksDB?"); "version of RocksDB?");
} }
@ -605,7 +605,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics,
const ReadOptions& read_options, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block) { BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version) {
Status s; Status s;
Block* compressed_block = nullptr; Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr;
@ -648,7 +648,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
s = UncompressBlockContents(compressed_block->data(), s = UncompressBlockContents(compressed_block->data(),
compressed_block->size(), &contents); compressed_block->size(), &contents,
format_version);
// Insert uncompressed block into block cache // Insert uncompressed block into block cache
if (s.ok()) { if (s.ok()) {
@ -673,7 +674,7 @@ Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics, const ReadOptions& read_options, Statistics* statistics,
CachableEntry<Block>* block, Block* raw_block) { CachableEntry<Block>* block, Block* raw_block, uint32_t format_version) {
assert(raw_block->compression_type() == kNoCompression || assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr); block_cache_compressed != nullptr);
@ -681,8 +682,8 @@ Status BlockBasedTable::PutDataBlockToCache(
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
s = UncompressBlockContents(raw_block->data(), raw_block->size(), s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents,
&contents); format_version);
} }
if (!s.ok()) { if (!s.ok()) {
delete raw_block; delete raw_block;
@ -929,7 +930,8 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
} }
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
statistics, ro, &block); statistics, ro, &block,
rep->table_options.format_version);
if (block.value == nullptr && !no_io && ro.fill_cache) { if (block.value == nullptr && !no_io && ro.fill_cache) {
Block* raw_block = nullptr; Block* raw_block = nullptr;
@ -942,7 +944,8 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
if (s.ok()) { if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
ro, statistics, &block, raw_block); ro, statistics, &block, raw_block,
rep->table_options.format_version);
} }
} }
} }
@ -1194,7 +1197,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
Slice ckey; Slice ckey;
s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, nullptr, s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, nullptr,
options, &block); options, &block,
rep_->table_options.format_version);
assert(s.ok()); assert(s.ok());
bool in_cache = block.value != nullptr; bool in_cache = block.value != nullptr;
if (in_cache) { if (in_cache) {

@ -150,7 +150,7 @@ class BlockBasedTable : public TableReader {
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics,
const ReadOptions& read_options, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block); BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version);
// Put a raw block (maybe compressed) to the corresponding block caches. // Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then // This method will perform decompression against raw_block if needed and then
// populate the block caches. // populate the block caches.
@ -163,7 +163,7 @@ class BlockBasedTable : public TableReader {
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, Statistics* statistics, const ReadOptions& read_options, Statistics* statistics,
CachableEntry<Block>* block, Block* raw_block); CachableEntry<Block>* block, Block* raw_block, uint32_t format_version);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false. // after a call to Seek(key), until handle_result returns false.

@ -331,7 +331,7 @@ Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
compression_type = static_cast<rocksdb::CompressionType>(slice.data()[n]); compression_type = static_cast<rocksdb::CompressionType>(slice.data()[n]);
if (decompression_requested && compression_type != kNoCompression) { if (decompression_requested && compression_type != kNoCompression) {
return UncompressBlockContents(slice.data(), n, contents); return UncompressBlockContents(slice.data(), n, contents, footer.version());
} }
if (slice.data() != used_buf) { if (slice.data() != used_buf) {
@ -354,8 +354,10 @@ Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
// contents are uncompresed into this buffer. This // contents are uncompresed into this buffer. This
// buffer is returned via 'result' and it is upto the caller to // buffer is returned via 'result' and it is upto the caller to
// free this buffer. // free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const char* data, size_t n, Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents) { BlockContents* contents,
uint32_t format_version) {
std::unique_ptr<char[]> ubuf; std::unique_ptr<char[]> ubuf;
int decompress_size = 0; int decompress_size = 0;
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
@ -375,8 +377,9 @@ Status UncompressBlockContents(const char* data, size_t n,
break; break;
} }
case kZlibCompression: case kZlibCompression:
ubuf = ubuf = std::unique_ptr<char[]>(Zlib_Uncompress(
std::unique_ptr<char[]>(Zlib_Uncompress(data, n, &decompress_size)); data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) { if (!ubuf) {
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents"; "Zlib not supported or corrupted Zlib compressed block contents";
@ -386,8 +389,9 @@ Status UncompressBlockContents(const char* data, size_t n,
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kBZip2Compression: case kBZip2Compression:
ubuf = ubuf = std::unique_ptr<char[]>(BZip2_Uncompress(
std::unique_ptr<char[]>(BZip2_Uncompress(data, n, &decompress_size)); data, n, &decompress_size,
GetCompressFormatForVersion(kBZip2Compression, format_version)));
if (!ubuf) { if (!ubuf) {
static char bzip2_corrupt_msg[] = static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents"; "Bzip2 not supported or corrupted Bzip2 compressed block contents";
@ -397,7 +401,9 @@ Status UncompressBlockContents(const char* data, size_t n,
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf = std::unique_ptr<char[]>(LZ4_Uncompress(data, n, &decompress_size)); ubuf = std::unique_ptr<char[]>(LZ4_Uncompress(
data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) { if (!ubuf) {
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents"; "LZ4 not supported or corrupted LZ4 compressed block contents";
@ -407,7 +413,9 @@ Status UncompressBlockContents(const char* data, size_t n,
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf = std::unique_ptr<char[]>(LZ4_Uncompress(data, n, &decompress_size)); ubuf = std::unique_ptr<char[]>(LZ4_Uncompress(
data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) { if (!ubuf) {
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents"; "LZ4HC not supported or corrupted LZ4HC compressed block contents";

@ -65,6 +65,21 @@ class BlockHandle {
static const BlockHandle kNullBlockHandle; static const BlockHandle kNullBlockHandle;
}; };
inline uint32_t GetCompressFormatForVersion(CompressionType compression_type,
uint32_t version) {
// snappy is not versioned
assert(compression_type != kSnappyCompression &&
compression_type != kNoCompression);
// As of version 2, we encode compressed block with
// compress_format_version == 2. Before that, the version is 1.
// DO NOT CHANGE THIS FUNCTION, it affects disk format
return version >= 2 ? 2 : 1;
}
inline bool BlockBasedTableSupportedVersion(uint32_t version) {
return version <= 2;
}
// Footer encapsulates the fixed information stored at the tail // Footer encapsulates the fixed information stored at the tail
// end of every table file. // end of every table file.
class Footer { class Footer {
@ -191,8 +206,11 @@ extern Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
// contents are uncompresed into this buffer. This buffer is // contents are uncompresed into this buffer. This buffer is
// returned via 'result' and it is upto the caller to // returned via 'result' and it is upto the caller to
// free this buffer. // free this buffer.
// For description of compress_format_version and possible values, see
// util/compression.h
extern Status UncompressBlockContents(const char* data, size_t n, extern Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents); BlockContents* contents,
uint32_t compress_format_version);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,

@ -545,7 +545,8 @@ static bool ZlibCompressionSupported() {
#ifdef ZLIB #ifdef ZLIB
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return Zlib_Compress(Options().compression_opts, in.data(), in.size(), &out); return Zlib_Compress(Options().compression_opts, 2, in.data(), in.size(),
&out);
#else #else
return false; return false;
#endif #endif
@ -555,7 +556,8 @@ static bool BZip2CompressionSupported() {
#ifdef BZIP2 #ifdef BZIP2
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return BZip2_Compress(Options().compression_opts, in.data(), in.size(), &out); return BZip2_Compress(Options().compression_opts, 2, in.data(), in.size(),
&out);
#else #else
return false; return false;
#endif #endif
@ -565,7 +567,8 @@ static bool LZ4CompressionSupported() {
#ifdef LZ4 #ifdef LZ4
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return LZ4_Compress(Options().compression_opts, in.data(), in.size(), &out); return LZ4_Compress(Options().compression_opts, 2, in.data(), in.size(),
&out);
#else #else
return false; return false;
#endif #endif
@ -575,7 +578,8 @@ static bool LZ4HCCompressionSupported() {
#ifdef LZ4 #ifdef LZ4
std::string out; std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return LZ4HC_Compress(Options().compression_opts, in.data(), in.size(), &out); return LZ4HC_Compress(Options().compression_opts, 2, in.data(), in.size(),
&out);
#else #else
return false; return false;
#endif #endif
@ -596,6 +600,7 @@ struct TestArgs {
bool reverse_compare; bool reverse_compare;
int restart_interval; int restart_interval;
CompressionType compression; CompressionType compression;
uint32_t format_version;
}; };
static std::vector<TestArgs> GenerateArgList() { static std::vector<TestArgs> GenerateArgList() {
@ -609,22 +614,26 @@ static std::vector<TestArgs> GenerateArgList() {
std::vector<int> restart_intervals = {16, 1, 1024}; std::vector<int> restart_intervals = {16, 1, 1024};
// Only add compression if it is supported // Only add compression if it is supported
std::vector<CompressionType> compression_types; std::vector<std::pair<CompressionType, bool>> compression_types;
compression_types.push_back(kNoCompression); compression_types.emplace_back(kNoCompression, false);
if (SnappyCompressionSupported()) { if (SnappyCompressionSupported()) {
compression_types.push_back(kSnappyCompression); compression_types.emplace_back(kSnappyCompression, false);
} }
if (ZlibCompressionSupported()) { if (ZlibCompressionSupported()) {
compression_types.push_back(kZlibCompression); compression_types.emplace_back(kZlibCompression, false);
compression_types.emplace_back(kZlibCompression, true);
} }
if (BZip2CompressionSupported()) { if (BZip2CompressionSupported()) {
compression_types.push_back(kBZip2Compression); compression_types.emplace_back(kBZip2Compression, false);
compression_types.emplace_back(kBZip2Compression, true);
} }
if (LZ4CompressionSupported()) { if (LZ4CompressionSupported()) {
compression_types.push_back(kLZ4Compression); compression_types.emplace_back(kLZ4Compression, false);
compression_types.emplace_back(kLZ4Compression, true);
} }
if (LZ4HCCompressionSupported()) { if (LZ4HCCompressionSupported()) {
compression_types.push_back(kLZ4HCCompression); compression_types.emplace_back(kLZ4HCCompression, false);
compression_types.emplace_back(kLZ4HCCompression, true);
} }
for (auto test_type : test_types) { for (auto test_type : test_types) {
@ -636,7 +645,7 @@ static std::vector<TestArgs> GenerateArgList() {
one_arg.type = test_type; one_arg.type = test_type;
one_arg.reverse_compare = reverse_compare; one_arg.reverse_compare = reverse_compare;
one_arg.restart_interval = restart_intervals[0]; one_arg.restart_interval = restart_intervals[0];
one_arg.compression = compression_types[0]; one_arg.compression = compression_types[0].first;
test_args.push_back(one_arg); test_args.push_back(one_arg);
continue; continue;
} }
@ -647,7 +656,8 @@ static std::vector<TestArgs> GenerateArgList() {
one_arg.type = test_type; one_arg.type = test_type;
one_arg.reverse_compare = reverse_compare; one_arg.reverse_compare = reverse_compare;
one_arg.restart_interval = restart_interval; one_arg.restart_interval = restart_interval;
one_arg.compression = compression_type; one_arg.compression = compression_type.first;
one_arg.format_version = compression_type.second ? 2 : 1;
test_args.push_back(one_arg); test_args.push_back(one_arg);
} }
} }
@ -718,6 +728,7 @@ class Harness {
new FlushBlockBySizePolicyFactory()); new FlushBlockBySizePolicyFactory());
table_options_.block_size = 256; table_options_.block_size = 256;
table_options_.block_restart_interval = args.restart_interval; table_options_.block_restart_interval = args.restart_interval;
table_options_.format_version = args.format_version;
options_.table_factory.reset( options_.table_factory.reset(
new BlockBasedTableFactory(table_options_)); new BlockBasedTableFactory(table_options_));
constructor_ = new TableConstructor(options_.comparator); constructor_ = new TableConstructor(options_.comparator);

@ -133,6 +133,22 @@ class SanityTestZlibCompression : public SanityTest {
Options options_; Options options_;
}; };
class SanityTestZlibCompressionVersion2 : public SanityTest {
public:
explicit SanityTestZlibCompressionVersion2(const std::string& path)
: SanityTest(path) {
options_.compression = kZlibCompression;
BlockBasedTableOptions table_options;
table_options.format_version = 2;
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
}
virtual Options GetOptions() const { return options_; }
virtual std::string Name() const { return "ZlibCompressionVersion2"; }
private:
Options options_;
};
class SanityTestLZ4Compression : public SanityTest { class SanityTestLZ4Compression : public SanityTest {
public: public:
explicit SanityTestLZ4Compression(const std::string& path) explicit SanityTestLZ4Compression(const std::string& path)
@ -197,6 +213,7 @@ bool RunSanityTests(const std::string& command, const std::string& path) {
std::vector<SanityTest*> sanity_tests = { std::vector<SanityTest*> sanity_tests = {
new SanityTestBasic(path), new SanityTestSpecialComparator(path), new SanityTestBasic(path), new SanityTestSpecialComparator(path),
new SanityTestZlibCompression(path), new SanityTestZlibCompression(path),
new SanityTestZlibCompressionVersion2(path),
new SanityTestLZ4Compression(path), new SanityTestLZ4Compression(path),
new SanityTestLZ4HCCompression(path), new SanityTestLZ4HCCompression(path),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -209,6 +226,7 @@ bool RunSanityTests(const std::string& command, const std::string& path) {
} else { } else {
fprintf(stderr, "Verifying...\n"); fprintf(stderr, "Verifying...\n");
} }
bool result = true;
for (auto sanity_test : sanity_tests) { for (auto sanity_test : sanity_tests) {
Status s; Status s;
fprintf(stderr, "%s -- ", sanity_test->Name().c_str()); fprintf(stderr, "%s -- ", sanity_test->Name().c_str());
@ -221,12 +239,12 @@ bool RunSanityTests(const std::string& command, const std::string& path) {
fprintf(stderr, "%s\n", s.ToString().c_str()); fprintf(stderr, "%s\n", s.ToString().c_str());
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "FAIL\n"); fprintf(stderr, "FAIL\n");
return false; result = false;
} }
delete sanity_test; delete sanity_test;
} }
return true; return result;
} }
} // namespace } // namespace

@ -9,7 +9,11 @@
// //
#pragma once #pragma once
#include <algorithm>
#include <limits>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "util/coding.h"
#ifdef SNAPPY #ifdef SNAPPY
#include <snappy.h> #include <snappy.h>
@ -30,6 +34,13 @@
namespace rocksdb { namespace rocksdb {
// compress_format_version can have two values:
// 1 -- decompressed sizes for BZip2 and Zlib are not included in the compressed
// block. Also, decompressed sizes for LZ4 are encoded in platform-dependent
// way.
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
// start of compressed block. Snappy format is the same as version 1.
inline bool Snappy_Compress(const CompressionOptions& opts, const char* input, inline bool Snappy_Compress(const CompressionOptions& opts, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef SNAPPY #ifdef SNAPPY
@ -61,9 +72,50 @@ inline bool Snappy_Uncompress(const char* input, size_t length,
#endif #endif
} }
inline bool Zlib_Compress(const CompressionOptions& opts, const char* input, namespace compression {
size_t length, ::std::string* output) { // returns size
inline size_t PutDecompressedSizeInfo(std::string* output, uint32_t length) {
PutVarint32(output, length);
return output->size();
}
inline bool GetDecompressedSizeInfo(const char** input_data,
size_t* input_length,
uint32_t* output_len) {
auto new_input_data =
GetVarint32Ptr(*input_data, *input_data + *input_length, output_len);
if (new_input_data == nullptr) {
return false;
}
*input_length -= (new_input_data - *input_data);
*input_data = new_input_data;
return true;
}
} // namespace compression
// compress_format_version == 1 -- decompressed size is not included in the
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline bool Zlib_Compress(const CompressionOptions& opts,
uint32_t compress_format_version,
const char* input, size_t length,
::std::string* output) {
#ifdef ZLIB #ifdef ZLIB
if (length > std::numeric_limits<uint32_t>::max()) {
// Can't compress more than 4GB
return false;
}
size_t output_header_len = 0;
if (compress_format_version == 2) {
output_header_len = compression::PutDecompressedSizeInfo(
output, static_cast<uint32_t>(length));
}
// Resize output to be the plain data length.
// This may not be big enough if the compression actually expands data.
output->resize(output_header_len + length);
// The memLevel parameter specifies how much memory should be allocated for // The memLevel parameter specifies how much memory should be allocated for
// the internal compression state. // the internal compression state.
// memLevel=1 uses minimum memory but is slow and reduces compression ratio. // memLevel=1 uses minimum memory but is slow and reduces compression ratio.
@ -78,19 +130,14 @@ inline bool Zlib_Compress(const CompressionOptions& opts, const char* input,
return false; return false;
} }
// Resize output to be the plain data length.
// This may not be big enough if the compression actually expands data.
output->resize(length);
// Compress the input, and put compressed data in output. // Compress the input, and put compressed data in output.
_stream.next_in = (Bytef *)input; _stream.next_in = (Bytef *)input;
_stream.avail_in = static_cast<unsigned int>(length); _stream.avail_in = static_cast<unsigned int>(length);
// Initialize the output size. // Initialize the output size.
_stream.avail_out = static_cast<unsigned int>(length); _stream.avail_out = static_cast<unsigned int>(length);
_stream.next_out = (Bytef*)&(*output)[0]; _stream.next_out = reinterpret_cast<Bytef*>(&(*output)[output_header_len]);
size_t old_sz = 0, new_sz = 0, new_sz_delta = 0;
bool done = false; bool done = false;
while (!done) { while (!done) {
st = deflate(&_stream, Z_FINISH); st = deflate(&_stream, Z_FINISH);
@ -99,16 +146,9 @@ inline bool Zlib_Compress(const CompressionOptions& opts, const char* input,
done = true; done = true;
break; break;
case Z_OK: case Z_OK:
// No output space. Increase the output space by 20%. // No output space. This means the compression is bigger than
// (Should we fail the compression since it expands the size?) // decompressed size. Just fail the compression in that case.
old_sz = output->size(); // Intentional fallback (to failure case)
new_sz_delta = static_cast<size_t>(output->size() * 0.2);
new_sz = output->size() + (new_sz_delta < 10 ? 10 : new_sz_delta);
output->resize(new_sz);
// Set more output.
_stream.next_out = (Bytef *)&(*output)[old_sz];
_stream.avail_out = static_cast<unsigned int>(new_sz - old_sz);
break;
case Z_BUF_ERROR: case Z_BUF_ERROR:
default: default:
deflateEnd(&_stream); deflateEnd(&_stream);
@ -116,16 +156,37 @@ inline bool Zlib_Compress(const CompressionOptions& opts, const char* input,
} }
} }
output->resize(output->size() - _stream.avail_out); output->resize(output->size() - _stream.avail_out + output_header_len);
deflateEnd(&_stream); deflateEnd(&_stream);
return true; return true;
#endif #endif
return false; return false;
} }
// compress_format_version == 1 -- decompressed size is not included in the
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline char* Zlib_Uncompress(const char* input_data, size_t input_length, inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
int* decompress_size, int windowBits = -14) { int* decompress_size,
uint32_t compress_format_version,
int windowBits = -14) {
#ifdef ZLIB #ifdef ZLIB
uint32_t output_len = 0;
if (compress_format_version == 2) {
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
&output_len)) {
return nullptr;
}
} else {
// Assume the decompressed data size will 5x of compressed size, but round
// to the page size
size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
output_len = static_cast<uint32_t>(
std::min(proposed_output_len,
static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
}
z_stream _stream; z_stream _stream;
memset(&_stream, 0, sizeof(z_stream)); memset(&_stream, 0, sizeof(z_stream));
@ -141,31 +202,27 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
_stream.next_in = (Bytef *)input_data; _stream.next_in = (Bytef *)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length); _stream.avail_in = static_cast<unsigned int>(input_length);
// Assume the decompressed data size will 5x of compressed size.
size_t output_len = input_length * 5;
char* output = new char[output_len]; char* output = new char[output_len];
size_t old_sz = output_len;
_stream.next_out = (Bytef *)output; _stream.next_out = (Bytef *)output;
_stream.avail_out = static_cast<unsigned int>(output_len); _stream.avail_out = static_cast<unsigned int>(output_len);
char* tmp = nullptr;
size_t output_len_delta;
bool done = false; bool done = false;
//while(_stream.next_in != nullptr && _stream.avail_in != 0) {
while (!done) { while (!done) {
st = inflate(&_stream, Z_SYNC_FLUSH); st = inflate(&_stream, Z_SYNC_FLUSH);
switch (st) { switch (st) {
case Z_STREAM_END: case Z_STREAM_END:
done = true; done = true;
break; break;
case Z_OK: case Z_OK: {
// No output space. Increase the output space by 20%. // No output space. Increase the output space by 20%.
old_sz = output_len; // We should never run out of output space if
output_len_delta = static_cast<size_t>(output_len * 0.2); // compress_format_version == 2
assert(compress_format_version != 2);
size_t old_sz = output_len;
size_t output_len_delta = static_cast<size_t>(output_len * 0.2);
output_len += output_len_delta < 10 ? 10 : output_len_delta; output_len += output_len_delta < 10 ? 10 : output_len_delta;
tmp = new char[output_len]; char* tmp = new char[output_len];
memcpy(tmp, output, old_sz); memcpy(tmp, output, old_sz);
delete[] output; delete[] output;
output = tmp; output = tmp;
@ -174,6 +231,7 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
_stream.next_out = (Bytef *)(output + old_sz); _stream.next_out = (Bytef *)(output + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz); _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break; break;
}
case Z_BUF_ERROR: case Z_BUF_ERROR:
default: default:
delete[] output; delete[] output;
@ -182,6 +240,8 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
} }
} }
// If we encoded decompressed block size, we should have no bytes left
assert(compress_format_version != 2 || _stream.avail_out == 0);
*decompress_size = static_cast<int>(output_len - _stream.avail_out); *decompress_size = static_cast<int>(output_len - _stream.avail_out);
inflateEnd(&_stream); inflateEnd(&_stream);
return output; return output;
@ -190,9 +250,29 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
return nullptr; return nullptr;
} }
inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, // compress_format_version == 1 -- decompressed size is not included in the
size_t length, ::std::string* output) { // block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline bool BZip2_Compress(const CompressionOptions& opts,
uint32_t compress_format_version,
const char* input, size_t length,
::std::string* output) {
#ifdef BZIP2 #ifdef BZIP2
if (length > std::numeric_limits<uint32_t>::max()) {
// Can't compress more than 4GB
return false;
}
size_t output_header_len = 0;
if (compress_format_version == 2) {
output_header_len = compression::PutDecompressedSizeInfo(
output, static_cast<uint32_t>(length));
}
// Resize output to be the plain data length.
// This may not be big enough if the compression actually expands data.
output->resize(output_header_len + length);
bz_stream _stream; bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream)); memset(&_stream, 0, sizeof(bz_stream));
@ -204,34 +284,23 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
return false; return false;
} }
// Resize output to be the plain data length.
// This may not be big enough if the compression actually expands data.
output->resize(length);
// Compress the input, and put compressed data in output. // Compress the input, and put compressed data in output.
_stream.next_in = (char *)input; _stream.next_in = (char *)input;
_stream.avail_in = static_cast<unsigned int>(length); _stream.avail_in = static_cast<unsigned int>(length);
// Initialize the output size. // Initialize the output size.
_stream.next_out = (char *)&(*output)[0];
_stream.avail_out = static_cast<unsigned int>(length); _stream.avail_out = static_cast<unsigned int>(length);
_stream.next_out = reinterpret_cast<char*>(&(*output)[output_header_len]);
size_t old_sz = 0, new_sz = 0;
while (_stream.next_in != nullptr && _stream.avail_in != 0) { while (_stream.next_in != nullptr && _stream.avail_in != 0) {
st = BZ2_bzCompress(&_stream, BZ_FINISH); st = BZ2_bzCompress(&_stream, BZ_FINISH);
switch (st) { switch (st) {
case BZ_STREAM_END: case BZ_STREAM_END:
break; break;
case BZ_FINISH_OK: case BZ_FINISH_OK:
// No output space. Increase the output space by 20%. // No output space. This means the compression is bigger than
// (Should we fail the compression since it expands the size?) // decompressed size. Just fail the compression in that case
old_sz = output->size(); // Intentional fallback (to failure case)
new_sz = static_cast<size_t>(output->size() * 1.2);
output->resize(new_sz);
// Set more output.
_stream.next_out = (char *)&(*output)[old_sz];
_stream.avail_out = static_cast<unsigned int>(new_sz - old_sz);
break;
case BZ_SEQUENCE_ERROR: case BZ_SEQUENCE_ERROR:
default: default:
BZ2_bzCompressEnd(&_stream); BZ2_bzCompressEnd(&_stream);
@ -239,16 +308,36 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
} }
} }
output->resize(output->size() - _stream.avail_out); output->resize(output->size() - _stream.avail_out + output_header_len);
BZ2_bzCompressEnd(&_stream); BZ2_bzCompressEnd(&_stream);
return true; return true;
#endif #endif
return false; return false;
} }
// compress_format_version == 1 -- decompressed size is not included in the
// block header
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline char* BZip2_Uncompress(const char* input_data, size_t input_length, inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) { int* decompress_size,
uint32_t compress_format_version) {
#ifdef BZIP2 #ifdef BZIP2
uint32_t output_len = 0;
if (compress_format_version == 2) {
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
&output_len)) {
return nullptr;
}
} else {
// Assume the decompressed data size will 5x of compressed size, but round
// to the next page size
size_t proposed_output_len = ((input_length * 5) & (~(4096 - 1))) + 4096;
output_len = static_cast<uint32_t>(
std::min(proposed_output_len,
static_cast<size_t>(std::numeric_limits<uint32_t>::max())));
}
bz_stream _stream; bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream)); memset(&_stream, 0, sizeof(bz_stream));
@ -260,26 +349,26 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
_stream.next_in = (char *)input_data; _stream.next_in = (char *)input_data;
_stream.avail_in = static_cast<unsigned int>(input_length); _stream.avail_in = static_cast<unsigned int>(input_length);
// Assume the decompressed data size will be 5x of compressed size.
size_t output_len = input_length * 5;
char* output = new char[output_len]; char* output = new char[output_len];
size_t old_sz = output_len;
_stream.next_out = (char *)output; _stream.next_out = (char *)output;
_stream.avail_out = static_cast<unsigned int>(output_len); _stream.avail_out = static_cast<unsigned int>(output_len);
char* tmp = nullptr; bool done = false;
while (!done) {
while(_stream.next_in != nullptr && _stream.avail_in != 0) {
st = BZ2_bzDecompress(&_stream); st = BZ2_bzDecompress(&_stream);
switch (st) { switch (st) {
case BZ_STREAM_END: case BZ_STREAM_END:
done = true;
break; break;
case BZ_OK: case BZ_OK: {
// No output space. Increase the output space by 20%. // No output space. Increase the output space by 20%.
old_sz = output_len; // We should never run out of output space if
output_len = static_cast<size_t>(output_len * 1.2); // compress_format_version == 2
tmp = new char[output_len]; assert(compress_format_version != 2);
uint32_t old_sz = output_len;
output_len = output_len * 1.2;
char* tmp = new char[output_len];
memcpy(tmp, output, old_sz); memcpy(tmp, output, old_sz);
delete[] output; delete[] output;
output = tmp; output = tmp;
@ -288,6 +377,7 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
_stream.next_out = (char *)(output + old_sz); _stream.next_out = (char *)(output + old_sz);
_stream.avail_out = static_cast<unsigned int>(output_len - old_sz); _stream.avail_out = static_cast<unsigned int>(output_len - old_sz);
break; break;
}
default: default:
delete[] output; delete[] output;
BZ2_bzDecompressEnd(&_stream); BZ2_bzDecompressEnd(&_stream);
@ -295,6 +385,8 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
} }
} }
// If we encoded decompressed block size, we should have no bytes left
assert(compress_format_version != 2 || _stream.avail_out == 0);
*decompress_size = static_cast<int>(output_len - _stream.avail_out); *decompress_size = static_cast<int>(output_len - _stream.avail_out);
BZ2_bzDecompressEnd(&_stream); BZ2_bzDecompressEnd(&_stream);
return output; return output;
@ -302,66 +394,132 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
return nullptr; return nullptr;
} }
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input, // compress_format_version == 1 -- decompressed size is included in the
// block header using memcpy, which makes database non-portable)
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline bool LZ4_Compress(const CompressionOptions& opts,
uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef LZ4 #ifdef LZ4
int compressBound = LZ4_compressBound(static_cast<int>(length)); if (length > std::numeric_limits<uint32_t>::max()) {
output->resize(static_cast<size_t>(8 + compressBound)); // Can't compress more than 4GB
return false;
}
size_t output_header_len = 0;
if (compress_format_version == 2) {
// new encoding, using varint32 to store size information
output_header_len = compression::PutDecompressedSizeInfo(
output, static_cast<uint32_t>(length));
} else {
// legacy encoding, which is not really portable (depends on big/little
// endianness)
output_header_len = 8;
output->resize(output_header_len);
char* p = const_cast<char*>(output->c_str()); char* p = const_cast<char*>(output->c_str());
memcpy(p, &length, sizeof(length)); memcpy(p, &length, sizeof(length));
int outlen = LZ4_compress_limitedOutput( }
input, p + 8, static_cast<int>(length), compressBound);
int compressBound = LZ4_compressBound(static_cast<int>(length));
output->resize(static_cast<size_t>(output_header_len + compressBound));
int outlen =
LZ4_compress_limitedOutput(input, &(*output)[output_header_len],
static_cast<int>(length), compressBound);
if (outlen == 0) { if (outlen == 0) {
return false; return false;
} }
output->resize(static_cast<size_t>(8 + outlen)); output->resize(static_cast<size_t>(output_header_len + outlen));
return true; return true;
#endif #endif
return false; return false;
} }
// compress_format_version == 1 -- decompressed size is included in the
// block header using memcpy, which makes database non-portable)
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline char* LZ4_Uncompress(const char* input_data, size_t input_length, inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) { int* decompress_size,
uint32_t compress_format_version) {
#ifdef LZ4 #ifdef LZ4
uint32_t output_len = 0;
if (compress_format_version == 2) {
// new encoding, using varint32 to store size information
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
&output_len)) {
return nullptr;
}
} else {
// legacy encoding, which is not really portable (depends on big/little
// endianness)
if (input_length < 8) { if (input_length < 8) {
return nullptr; return nullptr;
} }
int output_len;
memcpy(&output_len, input_data, sizeof(output_len)); memcpy(&output_len, input_data, sizeof(output_len));
input_length -= 8;
input_data += 8;
}
char* output = new char[output_len]; char* output = new char[output_len];
*decompress_size = LZ4_decompress_safe_partial( *decompress_size =
input_data + 8, output, static_cast<int>(input_length - 8), output_len, LZ4_decompress_safe(input_data, output, static_cast<int>(input_length),
output_len); static_cast<int>(output_len));
if (*decompress_size < 0) { if (*decompress_size < 0) {
delete[] output; delete[] output;
return nullptr; return nullptr;
} }
assert(*decompress_size == static_cast<int>(output_len));
return output; return output;
#endif #endif
return nullptr; return nullptr;
} }
inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input, // compress_format_version == 1 -- decompressed size is included in the
// block header using memcpy, which makes database non-portable)
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format
inline bool LZ4HC_Compress(const CompressionOptions& opts,
uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef LZ4 #ifdef LZ4
int compressBound = LZ4_compressBound(static_cast<int>(length)); if (length > std::numeric_limits<uint32_t>::max()) {
output->resize(static_cast<size_t>(8 + compressBound)); // Can't compress more than 4GB
return false;
}
size_t output_header_len = 0;
if (compress_format_version == 2) {
// new encoding, using varint32 to store size information
output_header_len = compression::PutDecompressedSizeInfo(
output, static_cast<uint32_t>(length));
} else {
// legacy encoding, which is not really portable (depends on big/little
// endianness)
output_header_len = 8;
output->resize(output_header_len);
char* p = const_cast<char*>(output->c_str()); char* p = const_cast<char*>(output->c_str());
memcpy(p, &length, sizeof(length)); memcpy(p, &length, sizeof(length));
}
int compressBound = LZ4_compressBound(static_cast<int>(length));
output->resize(static_cast<size_t>(output_header_len + compressBound));
int outlen; int outlen;
#ifdef LZ4_VERSION_MAJOR // they only started defining this since r113 #ifdef LZ4_VERSION_MAJOR // they only started defining this since r113
outlen = LZ4_compressHC2_limitedOutput(input, p + 8, static_cast<int>(length), outlen = LZ4_compressHC2_limitedOutput(input, &(*output)[output_header_len],
static_cast<int>(length),
compressBound, opts.level); compressBound, opts.level);
#else #else
outlen = LZ4_compressHC_limitedOutput(input, p + 8, static_cast<int>(length), outlen =
compressBound); LZ4_compressHC_limitedOutput(input, &(*output)[output_header_len],
static_cast<int>(length), compressBound);
#endif #endif
if (outlen == 0) { if (outlen == 0) {
return false; return false;
} }
output->resize(static_cast<size_t>(8 + outlen)); output->resize(static_cast<size_t>(output_header_len + outlen));
return true; return true;
#endif #endif
return false; return false;
} }
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save