From 8b79422b5231933dfda9adc690045e7fd9ab06ce Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 9 Aug 2016 10:16:32 -0700 Subject: [PATCH] [Proof-Of-Concept] RocksDB Blob Storage with a blob log file. Summary: This is a proof of concept of a RocksDB blob log file. The actual value of the Put() is appended to a blob log using normal data block format, and the handle of the block is written as the value of the key in RocksDB. The prototype only supports Put() and Get(). It doesn't support DB restart, garbage collection, Write() call, iterator, snapshots, etc. Test Plan: Add unit tests. Reviewers: arahut Reviewed By: arahut Subscribers: kradhakrishnan, leveldb, andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D61485 --- CMakeLists.txt | 2 + Makefile | 4 + src.mk | 2 + tools/db_bench_tool.cc | 14 +- utilities/blob_db/blob_db.cc | 209 ++++++++++++++++++++++++++++++ utilities/blob_db/blob_db.h | 22 ++++ utilities/blob_db/blob_db_test.cc | 80 ++++++++++++ 7 files changed, 331 insertions(+), 2 deletions(-) create mode 100644 utilities/blob_db/blob_db.cc create mode 100644 utilities/blob_db/blob_db.h create mode 100644 utilities/blob_db/blob_db_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index db80bd5ba..f985c2a46 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -248,6 +248,7 @@ set(SOURCES util/xfunc.cc util/xxhash.cc utilities/backupable/backupable_db.cc + utilities/blob_db/blob_db.cc utilities/checkpoint/checkpoint.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/date_tiered/date_tiered_db_impl.cc @@ -434,6 +435,7 @@ set(TESTS util/thread_list_test.cc util/thread_local_test.cc utilities/backupable/backupable_db_test.cc + utilities/blob_db/blob_db_test.cc utilities/checkpoint/checkpoint_test.cc utilities/date_tiered/date_tiered_test.cc utilities/document/document_db_test.cc diff --git a/Makefile b/Makefile index 258663790..556d76207 100644 --- a/Makefile +++ b/Makefile @@ -338,6 +338,7 @@ TESTS = \ ttl_test \ date_tiered_test \ backupable_db_test \ + blob_db_test \ document_db_test \ json_document_test \ sim_cache_test \ @@ -998,6 +999,9 @@ prefix_test: db/prefix_test.o $(LIBOBJECTS) $(TESTHARNESS) backupable_db_test: utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +blob_db_test: utilities/blob_db/blob_db_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + checkpoint_test: utilities/checkpoint/checkpoint_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/src.mk b/src.mk index 0f41cb94f..fcea98db3 100644 --- a/src.mk +++ b/src.mk @@ -114,6 +114,7 @@ LIB_SOURCES = \ util/sharded_cache.cc \ util/sst_file_manager_impl.cc \ utilities/backupable/backupable_db.cc \ + utilities/blob_db/blob_db.cc \ utilities/convenience/info_log_finder.cc \ utilities/checkpoint/checkpoint.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ @@ -287,6 +288,7 @@ MAIN_SOURCES = \ util/histogram_test.cc \ util/statistics_test.cc \ utilities/backupable/backupable_db_test.cc \ + utilities/blob_db/blob_db_test.cc \ utilities/checkpoint/checkpoint_test.cc \ utilities/document/document_db_test.cc \ utilities/document/json_document_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 6af20ba85..6e8c0fd59 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -67,6 +67,7 @@ #include "util/testutil.h" #include "util/transaction_test_util.h" #include "util/xxhash.h" +#include "utilities/blob_db/blob_db.h" #include "utilities/merge_operators.h" #ifdef OS_WIN @@ -568,6 +569,8 @@ DEFINE_string( DEFINE_bool(report_bg_io_stats, false, "Measure times spents on I/Os while in compactions. "); +DEFINE_bool(use_blob_db, false, "Whether to use BlobDB. "); + enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { assert(ctype); @@ -2906,6 +2909,8 @@ class Benchmark { db->db = ptr; } #endif // ROCKSDB_LITE + } else if (FLAGS_use_blob_db) { + s = NewBlobDB(options, db_name, &db->db); } else { s = DB::Open(options, db_name, &db->db); } @@ -3057,7 +3062,10 @@ class Benchmark { for (int64_t j = 0; j < entries_per_batch_; j++) { int64_t rand_num = key_gens[id]->Next(); GenerateKeyFromInt(rand_num, FLAGS_num, &key); - if (FLAGS_num_column_families <= 1) { + if (FLAGS_use_blob_db) { + s = db_with_cfh->db->Put(write_options_, key, + gen.Generate(value_size_)); + } else if (FLAGS_num_column_families <= 1) { batch.Put(key, gen.Generate(value_size_)); } else { // We use same rand_num as seed for key and column family so that we @@ -3068,7 +3076,9 @@ class Benchmark { } bytes += value_size_ + key_size_; } - s = db_with_cfh->db->Write(write_options_, &batch); + if (!FLAGS_use_blob_db) { + s = db_with_cfh->db->Write(write_options_, &batch); + } thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, entries_per_batch_, kWrite); if (!s.ok()) { diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc new file mode 100644 index 000000000..fe37d5a19 --- /dev/null +++ b/utilities/blob_db/blob_db.cc @@ -0,0 +1,209 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "utilities/blob_db/blob_db.h" + +#ifndef ROCKSDB_LITE +#include "db/filename.h" +#include "db/write_batch_internal.h" +#include "rocksdb/convenience.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/utilities/stackable_db.h" +#include "table/block.h" +#include "table/block_based_table_builder.h" +#include "table/block_builder.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" +#include "util/instrumented_mutex.h" + +namespace rocksdb { + +namespace { +int kBlockBasedTableVersionFormat = 2; +} // namespace + +class BlobDB : public StackableDB { + public: + using rocksdb::StackableDB::Put; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override; + + using rocksdb::StackableDB::Get; + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override; + + Status Open(); + + explicit BlobDB(DB* db); + + private: + std::string dbname_; + ImmutableCFOptions ioptions_; + InstrumentedMutex mutex_; + std::unique_ptr file_reader_; + std::unique_ptr file_writer_; + size_t writer_offset_; + size_t next_sync_offset_; + + static const std::string kFileName; + static const size_t kBlockHeaderSize; + static const size_t kBytesPerSync; +}; + +Status NewBlobDB(Options options, std::string dbname, DB** blob_db) { + DB* db; + Status s = DB::Open(options, dbname, &db); + if (!s.ok()) { + return s; + } + BlobDB* bdb = new BlobDB(db); + s = bdb->Open(); + if (!s.ok()) { + delete bdb; + } + *blob_db = bdb; + return s; +} + +const std::string BlobDB::kFileName = "blob_log"; +const size_t BlobDB::kBlockHeaderSize = 8; +const size_t BlobDB::kBytesPerSync = 1024 * 1024 * 128; + +BlobDB::BlobDB(DB* db) + : StackableDB(db), + ioptions_(db->GetOptions()), + writer_offset_(0), + next_sync_offset_(kBytesPerSync) {} + +Status BlobDB::Open() { + unique_ptr wfile; + EnvOptions env_options(db_->GetOptions()); + Status s = ioptions_.env->NewWritableFile(db_->GetName() + "/" + kFileName, + &wfile, env_options); + if (!s.ok()) { + return s; + } + file_writer_.reset(new WritableFileWriter(std::move(wfile), env_options)); + + // Write version + std::string version; + PutFixed64(&version, 0); + s = file_writer_->Append(Slice(version)); + if (!s.ok()) { + return s; + } + writer_offset_ += version.size(); + + std::unique_ptr rfile; + s = ioptions_.env->NewRandomAccessFile(db_->GetName() + "/" + kFileName, + &rfile, env_options); + if (!s.ok()) { + return s; + } + file_reader_.reset(new RandomAccessFileReader(std::move(rfile))); + return s; +} + +Status BlobDB::Put(const WriteOptions& options, const Slice& key, + const Slice& value) { + BlockBuilder block_builder(1, false); + block_builder.Add(key, value); + + CompressionType compression = CompressionType::kLZ4Compression; + CompressionOptions compression_opts; + + Slice block_contents; + std::string compression_output; + + block_contents = CompressBlock(block_builder.Finish(), compression_opts, + &compression, kBlockBasedTableVersionFormat, + Slice() /* dictionary */, &compression_output); + + char header[kBlockHeaderSize]; + char trailer[kBlockTrailerSize]; + trailer[0] = compression; + auto crc = crc32c::Value(block_contents.data(), block_contents.size()); + crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type + EncodeFixed32(trailer + 1, crc32c::Mask(crc)); + + BlockHandle handle; + std::string index_entry; + Status s; + { + InstrumentedMutexLock l(&mutex_); + auto raw_block_size = block_contents.size(); + EncodeFixed64(header, raw_block_size); + s = file_writer_->Append(Slice(header, kBlockHeaderSize)); + writer_offset_ += kBlockHeaderSize; + if (s.ok()) { + handle.set_offset(writer_offset_); + handle.set_size(raw_block_size); + s = file_writer_->Append(block_contents); + } + if (s.ok()) { + s = file_writer_->Append(Slice(trailer, kBlockTrailerSize)); + } + if (s.ok()) { + s = file_writer_->Flush(); + } + if (s.ok() && writer_offset_ > next_sync_offset_) { + // Sync every kBytesPerSync. This is a hacky way to limit unsynced data. + next_sync_offset_ += kBytesPerSync; + s = file_writer_->Sync(db_->GetOptions().use_fsync); + } + if (s.ok()) { + writer_offset_ += block_contents.size() + kBlockTrailerSize; + // Put file number + PutVarint64(&index_entry, 0); + handle.EncodeTo(&index_entry); + s = db_->Put(options, key, index_entry); + } + } + return s; +} + +Status BlobDB::Get(const ReadOptions& options, const Slice& key, + std::string* value) { + Status s; + std::string index_entry; + s = db_->Get(options, key, &index_entry); + if (!s.ok()) { + return s; + } + BlockHandle handle; + Slice index_entry_slice(index_entry); + uint64_t file_number; + if (!GetVarint64(&index_entry_slice, &file_number)) { + return Status::Corruption(); + } + assert(file_number == 0); + s = handle.DecodeFrom(&index_entry_slice); + if (!s.ok()) { + return s; + } + Footer footer(0, kBlockBasedTableVersionFormat); + BlockContents contents; + s = ReadBlockContents(file_reader_.get(), footer, options, handle, &contents, + ioptions_); + if (!s.ok()) { + return s; + } + Block block(std::move(contents)); + BlockIter bit; + InternalIterator* it = block.NewIterator(nullptr, &bit); + it->SeekToFirst(); + if (!it->status().ok()) { + return it->status(); + } + *value = it->value().ToString(); + return s; +} +} // namespace rocksdb +#else +namespace rocksdb { +Status NewBlobDB(Options options, std::string dbname, DB** blob_db) { + return Status::NotSupported(); +} +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h new file mode 100644 index 000000000..b57e3cfba --- /dev/null +++ b/utilities/blob_db/blob_db.h @@ -0,0 +1,22 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include "rocksdb/db.h" +#include "rocksdb/status.h" + +namespace rocksdb { +// EXPERIMENAL ONLY +// A wrapped database which puts values of KV pairs in a separate log +// and store location to the log in the underlying DB. +// It lacks lots of importatant functionalities, e.g. DB restarts, +// garbage collection, iterators, etc. +// +// The factory needs to be moved to include/rocksdb/utilities to allow +// users to use blob DB. +extern Status NewBlobDB(Options options, std::string dbname, DB** blob_db); +} // namespace rocksdb diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc new file mode 100644 index 000000000..4d26ef0e4 --- /dev/null +++ b/utilities/blob_db/blob_db_test.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db.h" +#include "util/random.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { +class BlobDBTest : public testing::Test { + public: + BlobDBTest() { + dbname_ = test::TmpDir() + "/blob_db_test"; + Options options; + options.create_if_missing = true; + EXPECT_TRUE(NewBlobDB(options, dbname_, &db_).ok()); + } + + ~BlobDBTest() { delete db_; } + + DB* db_; + std::string dbname_; +}; // class BlobDBTest + +TEST_F(BlobDBTest, Basic) { + WriteOptions wo; + ReadOptions ro; + std::string value; + + ASSERT_OK(db_->Put(wo, "foo", "v1")); + ASSERT_OK(db_->Put(wo, "bar", "v2")); + + ASSERT_OK(db_->Get(ro, "foo", &value)); + ASSERT_EQ("v1", value); + ASSERT_OK(db_->Get(ro, "bar", &value)); + ASSERT_EQ("v2", value); +} + +TEST_F(BlobDBTest, Large) { + WriteOptions wo; + ReadOptions ro; + std::string value1, value2, value3; + Random rnd(301); + + value1.assign(8999, '1'); + ASSERT_OK(db_->Put(wo, "foo", value1)); + value2.assign(9001, '2'); + ASSERT_OK(db_->Put(wo, "bar", value2)); + test::RandomString(&rnd, 13333, &value3); + ASSERT_OK(db_->Put(wo, "barfoo", value3)); + + std::string value; + ASSERT_OK(db_->Get(ro, "foo", &value)); + ASSERT_EQ(value1, value); + ASSERT_OK(db_->Get(ro, "bar", &value)); + ASSERT_EQ(value2, value); + ASSERT_OK(db_->Get(ro, "barfoo", &value)); + ASSERT_EQ(value3, value); +} + +} // namespace rocksdb + +// A black-box test for the ttl wrapper around rocksdb +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE