add zlib compression

Summary: add zlib compression

Test Plan: Will add more testcases

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D3873
main
heyongqiang 13 years ago
parent 4e4b6812ff
commit 054a5657f8
  1. 12
      build_detect_platform
  2. 3
      include/leveldb/options.h
  3. 12
      port/port_android.h
  4. 126
      port/port_posix.h
  5. 15
      table/format.cc
  6. 22
      table/table_builder.cc
  7. 144
      table/table_test.cc

@ -51,7 +51,7 @@ case "$TARGET_OS" in
;;
Linux)
PLATFORM=OS_LINUX
COMMON_FLAGS="-fno-builtin-memcmp -pthread -DOS_LINUX -fPIC"
COMMON_FLAGS="-I/usr/include -fno-builtin-memcmp -pthread -DOS_LINUX -fPIC"
PLATFORM_LDFLAGS="-pthread"
PORT_FILE=port/port_posix.cc
;;
@ -139,6 +139,16 @@ EOF
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lsnappy"
fi
# Test whether zlib library is installed
$CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <zlib.h>
int main() {}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DZLIB"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lz"
fi
# Test whether tcmalloc is available
$CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
int main() {}

@ -25,7 +25,8 @@ enum CompressionType {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1
kSnappyCompression = 0x1,
kZlibCompression =0x2
};
// Options to control the behavior of a database (passed to DB::Open)

@ -142,6 +142,18 @@ inline bool Snappy_Uncompress(
return false;
}
inline bool Zlib_Compress(const char* input, size_t length,
::std::string* output, int level = -1, int strategy = 0) {
return false;
}
inline bool Zlib_Uncompress(
const char* input_data,
size_t input_length,
char* output) {
return false;
}
inline uint64_t ThreadIdentifier() {
pthread_t tid = pthread_self();
uint64_t r = 0;

@ -27,8 +27,12 @@
#ifdef SNAPPY
#include <snappy.h>
#endif
#ifdef ZLIB
#include <zlib.h>
#endif
#include <stdint.h>
#include <string>
#include <string.h>
#include "port/atomic_pointer.h"
#ifdef LITTLE_ENDIAN
@ -119,6 +123,128 @@ inline bool Snappy_Uncompress(const char* input, size_t length,
#endif
}
inline bool Zlib_Compress(const char* input, size_t length,
::std::string* output, int windowBits = 15, int level = -1,
int strategy = 0) {
#ifdef ZLIB
// The memLevel parameter specifies how much memory should be allocated for
// the internal compression state.
// memLevel=1 uses minimum memory but is slow and reduces compression ratio.
// memLevel=9 uses maximum memory for optimal speed.
// The default value is 8. See zconf.h for more details.
static const int memLevel = 8;
z_stream _stream;
memset(&_stream, 0, sizeof(z_stream));
int st = deflateInit2(&_stream, level, Z_DEFLATED, windowBits,
memLevel, strategy);
if (st != Z_OK) {
return false;
}
// Resize output to be the plain data length.
// This may not be big enough if the compression actually expands data.
output->resize(length);
// Compress the input, and put compressed data in output.
_stream.next_in = (Bytef *)input;
_stream.avail_in = length;
// Initialize the output size.
_stream.avail_out = length;
_stream.next_out = (Bytef *)&(*output)[0];
int old_sz =0, new_sz =0;
while(_stream.next_in != NULL && _stream.avail_in != 0) {
int st = deflate(&_stream, Z_FINISH);
switch (st) {
case Z_STREAM_END:
break;
case Z_OK:
// No output space. Increase the output space by 20%.
// (Should we fail the compression since it expands the size?)
old_sz = output->size();
new_sz = output->size() * 1.2;
output->resize(new_sz);
// Set more output.
_stream.next_out = (Bytef *)&(*output)[old_sz];
_stream.avail_out = new_sz - old_sz;
break;
case Z_BUF_ERROR:
default:
deflateEnd(&_stream);
return false;
}
}
output->resize(output->size() - _stream.avail_out);
deflateEnd(&_stream);
return true;
#endif
return false;
}
inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
int* decompress_size, int windowBits = 15) {
#ifdef ZLIB
z_stream _stream;
memset(&_stream, 0, sizeof(z_stream));
// For raw inflate, the windowBits should be Ð8..Ð15.
// If windowBits is bigger than zero, it will use either zlib
// header or gzip header. Adding 32 to it will do automatic detection.
int st = inflateInit2(&_stream,
windowBits > 0 ? windowBits + 32 : windowBits);
if (st != Z_OK) {
return NULL;
}
_stream.next_in = (Bytef *)input_data;
_stream.avail_in = input_length;
// Assume the decompressed data size will 5x of compressed size.
int output_len = input_length * 5;
char* output = new char[output_len];
int old_sz = output_len;
_stream.next_out = (Bytef *)output;
_stream.avail_out = output_len;
char* tmp = NULL;
while(_stream.next_in != NULL && _stream.avail_in != 0) {
int st = inflate(&_stream, Z_SYNC_FLUSH);
switch (st) {
case Z_STREAM_END:
break;
case Z_OK:
// No output space. Increase the output space by 20%.
old_sz = output_len;
output_len = output_len * 1.2;
tmp = new char[output_len];
memcpy(tmp, output, old_sz);
delete[] output;
output = tmp;
// Set more output.
_stream.next_out = (Bytef *)(output + old_sz);
_stream.avail_out = output_len - old_sz;
break;
case Z_BUF_ERROR:
default:
delete[] output;
inflateEnd(&_stream);
return NULL;
}
}
*decompress_size = output_len - _stream.avail_out;
inflateEnd(&_stream);
return output;
#endif
return false;
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
return false;
}

@ -98,6 +98,7 @@ Status ReadBlock(RandomAccessFile* file,
}
}
char* ubuf = NULL;
switch (data[n]) {
case kNoCompression:
if (data != buf) {
@ -122,7 +123,7 @@ Status ReadBlock(RandomAccessFile* file,
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
}
char* ubuf = new char[ulength];
ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
@ -134,6 +135,18 @@ Status ReadBlock(RandomAccessFile* file,
result->cachable = true;
break;
}
case kZlibCompression:
int decompress_size;
ubuf = port::Zlib_Uncompress(data, n, &decompress_size);
if (!ubuf) {
delete[] buf;
return Status::Corruption("corrupted compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, decompress_size);
result->heap_allocated = true;
result->cachable = true;
break;
default:
delete[] buf;
return Status::Corruption("bad block type");

@ -136,6 +136,11 @@ void TableBuilder::Flush() {
}
}
static bool GoodCompressionRatio(int compressed_size, int raw_size) {
// Check to see if compressed less than 12.5%
return compressed_size < raw_size - (raw_size / 8u);
}
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
@ -147,7 +152,6 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
@ -156,16 +160,28 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// Snappy not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZlibCompression:
std::string* compressed = &r->compressed_output;
if (port::Zlib_Compress(raw.data(), raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Zlib not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();

@ -396,6 +396,18 @@ class DBConstructor: public Constructor {
DB* db_;
};
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
}
static bool ZlibCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(in.data(), in.size(), &out);
}
enum TestType {
TABLE_TEST,
BLOCK_TEST,
@ -407,32 +419,47 @@ struct TestArgs {
TestType type;
bool reverse_compare;
int restart_interval;
CompressionType compression;
};
static const TestArgs kTestArgList[] = {
{ TABLE_TEST, false, 16 },
{ TABLE_TEST, false, 1 },
{ TABLE_TEST, false, 1024 },
{ TABLE_TEST, true, 16 },
{ TABLE_TEST, true, 1 },
{ TABLE_TEST, true, 1024 },
{ BLOCK_TEST, false, 16 },
{ BLOCK_TEST, false, 1 },
{ BLOCK_TEST, false, 1024 },
{ BLOCK_TEST, true, 16 },
{ BLOCK_TEST, true, 1 },
{ BLOCK_TEST, true, 1024 },
// Restart interval does not matter for memtables
{ MEMTABLE_TEST, false, 16 },
{ MEMTABLE_TEST, true, 16 },
// Do not bother with restart interval variations for DB
{ DB_TEST, false, 16 },
{ DB_TEST, true, 16 },
};
static const int kNumTestArgs = sizeof(kTestArgList) / sizeof(kTestArgList[0]);
static std::vector<TestArgs> Generate_Arg_List()
{
std::vector<TestArgs> ret;
TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST};
int test_type_len = 4;
bool reverse_compare[2] = {false, true};
int reverse_compare_len = 2;
int restart_interval[3] = {16, 1, 1024};
int restart_interval_len = 3;
// Only add compression if it is supported
std::vector<CompressionType> compression_types;
compression_types.push_back(kNoCompression);
#ifdef SNAPPY
if (SnappyCompressionSupported())
compression_types.push_back(kSnappyCompression);
#endif
#ifdef ZLIB
if (ZlibCompressionSupported())
compression_types.push_back(kZlibCompression);
#endif
for(int i =0; i < test_type_len; i++)
for (int j =0; j < reverse_compare_len; j++)
for (int k =0; k < restart_interval_len; k++)
for (int n =0; n < compression_types.size(); n++) {
TestArgs one_arg;
one_arg.type = test_type[i];
one_arg.reverse_compare = reverse_compare[j];
one_arg.restart_interval = restart_interval[k];
one_arg.compression = compression_types[n];
ret.push_back(one_arg);
}
return ret;
}
class Harness {
public:
@ -444,6 +471,7 @@ class Harness {
options_ = Options();
options_.block_restart_interval = args.restart_interval;
options_.compression = args.compression;
// Use shorter block size for tests to exercise block boundary
// conditions more.
options_.block_size = 256;
@ -646,8 +674,9 @@ class Harness {
// Test the empty key
TEST(Harness, SimpleEmptyKey) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
std::vector<TestArgs> args = Generate_Arg_List();
for (int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 1);
Add("", "v");
Test(&rnd);
@ -655,8 +684,9 @@ TEST(Harness, SimpleEmptyKey) {
}
TEST(Harness, SimpleSingle) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
std::vector<TestArgs> args = Generate_Arg_List();
for (int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 2);
Add("abc", "v");
Test(&rnd);
@ -664,8 +694,9 @@ TEST(Harness, SimpleSingle) {
}
TEST(Harness, SimpleMulti) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
std::vector<TestArgs> args = Generate_Arg_List();
for (int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 3);
Add("abc", "v");
Add("abcd", "v");
@ -675,8 +706,9 @@ TEST(Harness, SimpleMulti) {
}
TEST(Harness, SimpleSpecialKey) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
std::vector<TestArgs> args = Generate_Arg_List();
for (int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 4);
Add("\xff\xff", "v3");
Test(&rnd);
@ -684,14 +716,15 @@ TEST(Harness, SimpleSpecialKey) {
}
TEST(Harness, Randomized) {
for (int i = 0; i < kNumTestArgs; i++) {
Init(kTestArgList[i]);
std::vector<TestArgs> args = Generate_Arg_List();
for (int i = 0; i < args.size(); i++) {
Init(args[i]);
Random rnd(test::RandomSeed() + 5);
for (int num_entries = 0; num_entries < 2000;
num_entries += (num_entries < 50 ? 1 : 200)) {
if ((num_entries % 10) == 0) {
fprintf(stderr, "case %d of %d: num_entries = %d\n",
(i + 1), int(kNumTestArgs), num_entries);
(i + 1), int(args.size()), num_entries);
}
for (int e = 0; e < num_entries; e++) {
std::string v;
@ -705,7 +738,7 @@ TEST(Harness, Randomized) {
TEST(Harness, RandomizedLongDB) {
Random rnd(test::RandomSeed());
TestArgs args = { DB_TEST, false, 16 };
TestArgs args = { DB_TEST, false, 16, kNoCompression };
Init(args);
int num_entries = 100000;
for (int e = 0; e < num_entries; e++) {
@ -797,18 +830,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
}
static bool SnappyCompressionSupported() {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(in.data(), in.size(), &out);
}
TEST(TableTest, ApproximateOffsetOfCompressed) {
if (!SnappyCompressionSupported()) {
fprintf(stderr, "skipping compression tests\n");
return;
}
static void Do_Compression_Test(CompressionType comp) {
Random rnd(301);
TableConstructor c(BytewiseComparator());
std::string tmp;
@ -820,7 +842,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) {
KVMap kvmap;
Options options;
options.block_size = 1024;
options.compression = kSnappyCompression;
options.compression = comp;
c.Finish(options, &keys, &kvmap);
ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
@ -831,6 +853,30 @@ TEST(TableTest, ApproximateOffsetOfCompressed) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 4000, 6000));
}
TEST(TableTest, ApproximateOffsetOfCompressed) {
CompressionType compression_state[2];
int valid = 0;
if (!SnappyCompressionSupported()) {
fprintf(stderr, "skipping snappy compression tests\n");
} else {
compression_state[valid] = kSnappyCompression;
valid++;
}
if (!ZlibCompressionSupported()) {
fprintf(stderr, "skipping zlib compression tests\n");
} else {
compression_state[valid] = kZlibCompression;
valid++;
}
for(int i =0; i < valid; i++)
{
Do_Compression_Test(compression_state[i]);
}
}
} // namespace leveldb
int main(int argc, char** argv) {

Loading…
Cancel
Save