[Proof-Of-Concept] RocksDB Blob Storage with a blob log file.

Summary:
This is a proof of concept of a RocksDB blob log file. The actual value of the Put() is appended to a blob log using normal data block format, and the handle of the block is written as the value of the key in RocksDB.

The prototype only supports Put() and Get(). It doesn't support DB restart, garbage collection, Write() call, iterator, snapshots, etc.

Test Plan: Add unit tests.

Reviewers: arahut

Reviewed By: arahut

Subscribers: kradhakrishnan, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61485
main
sdong 9 years ago
parent 4beffe001d
commit 8b79422b52
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 2
      src.mk
  4. 14
      tools/db_bench_tool.cc
  5. 209
      utilities/blob_db/blob_db.cc
  6. 22
      utilities/blob_db/blob_db.h
  7. 80
      utilities/blob_db/blob_db_test.cc

@ -248,6 +248,7 @@ set(SOURCES
util/xfunc.cc
util/xxhash.cc
utilities/backupable/backupable_db.cc
utilities/blob_db/blob_db.cc
utilities/checkpoint/checkpoint.cc
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
utilities/date_tiered/date_tiered_db_impl.cc
@ -434,6 +435,7 @@ set(TESTS
util/thread_list_test.cc
util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc
utilities/blob_db/blob_db_test.cc
utilities/checkpoint/checkpoint_test.cc
utilities/date_tiered/date_tiered_test.cc
utilities/document/document_db_test.cc

@ -338,6 +338,7 @@ TESTS = \
ttl_test \
date_tiered_test \
backupable_db_test \
blob_db_test \
document_db_test \
json_document_test \
sim_cache_test \
@ -998,6 +999,9 @@ prefix_test: db/prefix_test.o $(LIBOBJECTS) $(TESTHARNESS)
backupable_db_test: utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
blob_db_test: utilities/blob_db/blob_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
checkpoint_test: utilities/checkpoint/checkpoint_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -114,6 +114,7 @@ LIB_SOURCES = \
util/sharded_cache.cc \
util/sst_file_manager_impl.cc \
utilities/backupable/backupable_db.cc \
utilities/blob_db/blob_db.cc \
utilities/convenience/info_log_finder.cc \
utilities/checkpoint/checkpoint.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
@ -287,6 +288,7 @@ MAIN_SOURCES = \
util/histogram_test.cc \
util/statistics_test.cc \
utilities/backupable/backupable_db_test.cc \
utilities/blob_db/blob_db_test.cc \
utilities/checkpoint/checkpoint_test.cc \
utilities/document/document_db_test.cc \
utilities/document/json_document_test.cc \

@ -67,6 +67,7 @@
#include "util/testutil.h"
#include "util/transaction_test_util.h"
#include "util/xxhash.h"
#include "utilities/blob_db/blob_db.h"
#include "utilities/merge_operators.h"
#ifdef OS_WIN
@ -568,6 +569,8 @@ DEFINE_string(
DEFINE_bool(report_bg_io_stats, false,
"Measure times spents on I/Os while in compactions. ");
DEFINE_bool(use_blob_db, false, "Whether to use BlobDB. ");
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
assert(ctype);
@ -2906,6 +2909,8 @@ class Benchmark {
db->db = ptr;
}
#endif // ROCKSDB_LITE
} else if (FLAGS_use_blob_db) {
s = NewBlobDB(options, db_name, &db->db);
} else {
s = DB::Open(options, db_name, &db->db);
}
@ -3057,7 +3062,10 @@ class Benchmark {
for (int64_t j = 0; j < entries_per_batch_; j++) {
int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
if (FLAGS_num_column_families <= 1) {
if (FLAGS_use_blob_db) {
s = db_with_cfh->db->Put(write_options_, key,
gen.Generate(value_size_));
} else if (FLAGS_num_column_families <= 1) {
batch.Put(key, gen.Generate(value_size_));
} else {
// We use same rand_num as seed for key and column family so that we
@ -3068,7 +3076,9 @@ class Benchmark {
}
bytes += value_size_ + key_size_;
}
s = db_with_cfh->db->Write(write_options_, &batch);
if (!FLAGS_use_blob_db) {
s = db_with_cfh->db->Write(write_options_, &batch);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
entries_per_batch_, kWrite);
if (!s.ok()) {

@ -0,0 +1,209 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "utilities/blob_db/blob_db.h"
#ifndef ROCKSDB_LITE
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/stackable_db.h"
#include "table/block.h"
#include "table/block_based_table_builder.h"
#include "table/block_builder.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/instrumented_mutex.h"
namespace rocksdb {
namespace {
int kBlockBasedTableVersionFormat = 2;
} // namespace
class BlobDB : public StackableDB {
public:
using rocksdb::StackableDB::Put;
Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) override;
using rocksdb::StackableDB::Get;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Status Open();
explicit BlobDB(DB* db);
private:
std::string dbname_;
ImmutableCFOptions ioptions_;
InstrumentedMutex mutex_;
std::unique_ptr<RandomAccessFileReader> file_reader_;
std::unique_ptr<WritableFileWriter> file_writer_;
size_t writer_offset_;
size_t next_sync_offset_;
static const std::string kFileName;
static const size_t kBlockHeaderSize;
static const size_t kBytesPerSync;
};
Status NewBlobDB(Options options, std::string dbname, DB** blob_db) {
DB* db;
Status s = DB::Open(options, dbname, &db);
if (!s.ok()) {
return s;
}
BlobDB* bdb = new BlobDB(db);
s = bdb->Open();
if (!s.ok()) {
delete bdb;
}
*blob_db = bdb;
return s;
}
const std::string BlobDB::kFileName = "blob_log";
const size_t BlobDB::kBlockHeaderSize = 8;
const size_t BlobDB::kBytesPerSync = 1024 * 1024 * 128;
BlobDB::BlobDB(DB* db)
: StackableDB(db),
ioptions_(db->GetOptions()),
writer_offset_(0),
next_sync_offset_(kBytesPerSync) {}
Status BlobDB::Open() {
unique_ptr<WritableFile> wfile;
EnvOptions env_options(db_->GetOptions());
Status s = ioptions_.env->NewWritableFile(db_->GetName() + "/" + kFileName,
&wfile, env_options);
if (!s.ok()) {
return s;
}
file_writer_.reset(new WritableFileWriter(std::move(wfile), env_options));
// Write version
std::string version;
PutFixed64(&version, 0);
s = file_writer_->Append(Slice(version));
if (!s.ok()) {
return s;
}
writer_offset_ += version.size();
std::unique_ptr<RandomAccessFile> rfile;
s = ioptions_.env->NewRandomAccessFile(db_->GetName() + "/" + kFileName,
&rfile, env_options);
if (!s.ok()) {
return s;
}
file_reader_.reset(new RandomAccessFileReader(std::move(rfile)));
return s;
}
Status BlobDB::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
BlockBuilder block_builder(1, false);
block_builder.Add(key, value);
CompressionType compression = CompressionType::kLZ4Compression;
CompressionOptions compression_opts;
Slice block_contents;
std::string compression_output;
block_contents = CompressBlock(block_builder.Finish(), compression_opts,
&compression, kBlockBasedTableVersionFormat,
Slice() /* dictionary */, &compression_output);
char header[kBlockHeaderSize];
char trailer[kBlockTrailerSize];
trailer[0] = compression;
auto crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
BlockHandle handle;
std::string index_entry;
Status s;
{
InstrumentedMutexLock l(&mutex_);
auto raw_block_size = block_contents.size();
EncodeFixed64(header, raw_block_size);
s = file_writer_->Append(Slice(header, kBlockHeaderSize));
writer_offset_ += kBlockHeaderSize;
if (s.ok()) {
handle.set_offset(writer_offset_);
handle.set_size(raw_block_size);
s = file_writer_->Append(block_contents);
}
if (s.ok()) {
s = file_writer_->Append(Slice(trailer, kBlockTrailerSize));
}
if (s.ok()) {
s = file_writer_->Flush();
}
if (s.ok() && writer_offset_ > next_sync_offset_) {
// Sync every kBytesPerSync. This is a hacky way to limit unsynced data.
next_sync_offset_ += kBytesPerSync;
s = file_writer_->Sync(db_->GetOptions().use_fsync);
}
if (s.ok()) {
writer_offset_ += block_contents.size() + kBlockTrailerSize;
// Put file number
PutVarint64(&index_entry, 0);
handle.EncodeTo(&index_entry);
s = db_->Put(options, key, index_entry);
}
}
return s;
}
Status BlobDB::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
std::string index_entry;
s = db_->Get(options, key, &index_entry);
if (!s.ok()) {
return s;
}
BlockHandle handle;
Slice index_entry_slice(index_entry);
uint64_t file_number;
if (!GetVarint64(&index_entry_slice, &file_number)) {
return Status::Corruption();
}
assert(file_number == 0);
s = handle.DecodeFrom(&index_entry_slice);
if (!s.ok()) {
return s;
}
Footer footer(0, kBlockBasedTableVersionFormat);
BlockContents contents;
s = ReadBlockContents(file_reader_.get(), footer, options, handle, &contents,
ioptions_);
if (!s.ok()) {
return s;
}
Block block(std::move(contents));
BlockIter bit;
InternalIterator* it = block.NewIterator(nullptr, &bit);
it->SeekToFirst();
if (!it->status().ok()) {
return it->status();
}
*value = it->value().ToString();
return s;
}
} // namespace rocksdb
#else
namespace rocksdb {
Status NewBlobDB(Options options, std::string dbname, DB** blob_db) {
return Status::NotSupported();
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,22 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/status.h"
namespace rocksdb {
// EXPERIMENAL ONLY
// A wrapped database which puts values of KV pairs in a separate log
// and store location to the log in the underlying DB.
// It lacks lots of importatant functionalities, e.g. DB restarts,
// garbage collection, iterators, etc.
//
// The factory needs to be moved to include/rocksdb/utilities to allow
// users to use blob DB.
extern Status NewBlobDB(Options options, std::string dbname, DB** blob_db);
} // namespace rocksdb

@ -0,0 +1,80 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
#include "util/random.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
class BlobDBTest : public testing::Test {
public:
BlobDBTest() {
dbname_ = test::TmpDir() + "/blob_db_test";
Options options;
options.create_if_missing = true;
EXPECT_TRUE(NewBlobDB(options, dbname_, &db_).ok());
}
~BlobDBTest() { delete db_; }
DB* db_;
std::string dbname_;
}; // class BlobDBTest
TEST_F(BlobDBTest, Basic) {
WriteOptions wo;
ReadOptions ro;
std::string value;
ASSERT_OK(db_->Put(wo, "foo", "v1"));
ASSERT_OK(db_->Put(wo, "bar", "v2"));
ASSERT_OK(db_->Get(ro, "foo", &value));
ASSERT_EQ("v1", value);
ASSERT_OK(db_->Get(ro, "bar", &value));
ASSERT_EQ("v2", value);
}
TEST_F(BlobDBTest, Large) {
WriteOptions wo;
ReadOptions ro;
std::string value1, value2, value3;
Random rnd(301);
value1.assign(8999, '1');
ASSERT_OK(db_->Put(wo, "foo", value1));
value2.assign(9001, '2');
ASSERT_OK(db_->Put(wo, "bar", value2));
test::RandomString(&rnd, 13333, &value3);
ASSERT_OK(db_->Put(wo, "barfoo", value3));
std::string value;
ASSERT_OK(db_->Get(ro, "foo", &value));
ASSERT_EQ(value1, value);
ASSERT_OK(db_->Get(ro, "bar", &value));
ASSERT_EQ(value2, value);
ASSERT_OK(db_->Get(ro, "barfoo", &value));
ASSERT_EQ(value3, value);
}
} // namespace rocksdb
// A black-box test for the ttl wrapper around rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int argc, char** argv) {
fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE
Loading…
Cancel
Save