diff --git a/CMakeLists.txt b/CMakeLists.txt index db80bd5ba..f985c2a46 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -248,6 +248,7 @@ set(SOURCES util/xfunc.cc util/xxhash.cc utilities/backupable/backupable_db.cc + utilities/blob_db/blob_db.cc utilities/checkpoint/checkpoint.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/date_tiered/date_tiered_db_impl.cc @@ -434,6 +435,7 @@ set(TESTS util/thread_list_test.cc util/thread_local_test.cc utilities/backupable/backupable_db_test.cc + utilities/blob_db/blob_db_test.cc utilities/checkpoint/checkpoint_test.cc utilities/date_tiered/date_tiered_test.cc utilities/document/document_db_test.cc diff --git a/Makefile b/Makefile index 258663790..556d76207 100644 --- a/Makefile +++ b/Makefile @@ -338,6 +338,7 @@ TESTS = \ ttl_test \ date_tiered_test \ backupable_db_test \ + blob_db_test \ document_db_test \ json_document_test \ sim_cache_test \ @@ -998,6 +999,9 @@ prefix_test: db/prefix_test.o $(LIBOBJECTS) $(TESTHARNESS) backupable_db_test: utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +blob_db_test: utilities/blob_db/blob_db_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + checkpoint_test: utilities/checkpoint/checkpoint_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/src.mk b/src.mk index 0f41cb94f..fcea98db3 100644 --- a/src.mk +++ b/src.mk @@ -114,6 +114,7 @@ LIB_SOURCES = \ util/sharded_cache.cc \ util/sst_file_manager_impl.cc \ utilities/backupable/backupable_db.cc \ + utilities/blob_db/blob_db.cc \ utilities/convenience/info_log_finder.cc \ utilities/checkpoint/checkpoint.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ @@ -287,6 +288,7 @@ MAIN_SOURCES = \ util/histogram_test.cc \ util/statistics_test.cc \ utilities/backupable/backupable_db_test.cc \ + utilities/blob_db/blob_db_test.cc \ utilities/checkpoint/checkpoint_test.cc \ utilities/document/document_db_test.cc \ utilities/document/json_document_test.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 6af20ba85..6e8c0fd59 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -67,6 +67,7 @@ #include "util/testutil.h" #include "util/transaction_test_util.h" #include "util/xxhash.h" +#include "utilities/blob_db/blob_db.h" #include "utilities/merge_operators.h" #ifdef OS_WIN @@ -568,6 +569,8 @@ DEFINE_string( DEFINE_bool(report_bg_io_stats, false, "Measure times spents on I/Os while in compactions. "); +DEFINE_bool(use_blob_db, false, "Whether to use BlobDB. "); + enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { assert(ctype); @@ -2906,6 +2909,8 @@ class Benchmark { db->db = ptr; } #endif // ROCKSDB_LITE + } else if (FLAGS_use_blob_db) { + s = NewBlobDB(options, db_name, &db->db); } else { s = DB::Open(options, db_name, &db->db); } @@ -3057,7 +3062,10 @@ class Benchmark { for (int64_t j = 0; j < entries_per_batch_; j++) { int64_t rand_num = key_gens[id]->Next(); GenerateKeyFromInt(rand_num, FLAGS_num, &key); - if (FLAGS_num_column_families <= 1) { + if (FLAGS_use_blob_db) { + s = db_with_cfh->db->Put(write_options_, key, + gen.Generate(value_size_)); + } else if (FLAGS_num_column_families <= 1) { batch.Put(key, gen.Generate(value_size_)); } else { // We use same rand_num as seed for key and column family so that we @@ -3068,7 +3076,9 @@ class Benchmark { } bytes += value_size_ + key_size_; } - s = db_with_cfh->db->Write(write_options_, &batch); + if (!FLAGS_use_blob_db) { + s = db_with_cfh->db->Write(write_options_, &batch); + } thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, entries_per_batch_, kWrite); if (!s.ok()) { diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc new file mode 100644 index 000000000..fe37d5a19 --- /dev/null +++ b/utilities/blob_db/blob_db.cc @@ -0,0 +1,209 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "utilities/blob_db/blob_db.h" + +#ifndef ROCKSDB_LITE +#include "db/filename.h" +#include "db/write_batch_internal.h" +#include "rocksdb/convenience.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/utilities/stackable_db.h" +#include "table/block.h" +#include "table/block_based_table_builder.h" +#include "table/block_builder.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" +#include "util/instrumented_mutex.h" + +namespace rocksdb { + +namespace { +int kBlockBasedTableVersionFormat = 2; +} // namespace + +class BlobDB : public StackableDB { + public: + using rocksdb::StackableDB::Put; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override; + + using rocksdb::StackableDB::Get; + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override; + + Status Open(); + + explicit BlobDB(DB* db); + + private: + std::string dbname_; + ImmutableCFOptions ioptions_; + InstrumentedMutex mutex_; + std::unique_ptr file_reader_; + std::unique_ptr file_writer_; + size_t writer_offset_; + size_t next_sync_offset_; + + static const std::string kFileName; + static const size_t kBlockHeaderSize; + static const size_t kBytesPerSync; +}; + +Status NewBlobDB(Options options, std::string dbname, DB** blob_db) { + DB* db; + Status s = DB::Open(options, dbname, &db); + if (!s.ok()) { + return s; + } + BlobDB* bdb = new BlobDB(db); + s = bdb->Open(); + if (!s.ok()) { + delete bdb; + } + *blob_db = bdb; + return s; +} + +const std::string BlobDB::kFileName = "blob_log"; +const size_t BlobDB::kBlockHeaderSize = 8; +const size_t BlobDB::kBytesPerSync = 1024 * 1024 * 128; + +BlobDB::BlobDB(DB* db) + : StackableDB(db), + ioptions_(db->GetOptions()), + writer_offset_(0), + next_sync_offset_(kBytesPerSync) {} + +Status BlobDB::Open() { + unique_ptr wfile; + EnvOptions env_options(db_->GetOptions()); + Status s = ioptions_.env->NewWritableFile(db_->GetName() + "/" + kFileName, + &wfile, env_options); + if (!s.ok()) { + return s; + } + file_writer_.reset(new WritableFileWriter(std::move(wfile), env_options)); + + // Write version + std::string version; + PutFixed64(&version, 0); + s = file_writer_->Append(Slice(version)); + if (!s.ok()) { + return s; + } + writer_offset_ += version.size(); + + std::unique_ptr rfile; + s = ioptions_.env->NewRandomAccessFile(db_->GetName() + "/" + kFileName, + &rfile, env_options); + if (!s.ok()) { + return s; + } + file_reader_.reset(new RandomAccessFileReader(std::move(rfile))); + return s; +} + +Status BlobDB::Put(const WriteOptions& options, const Slice& key, + const Slice& value) { + BlockBuilder block_builder(1, false); + block_builder.Add(key, value); + + CompressionType compression = CompressionType::kLZ4Compression; + CompressionOptions compression_opts; + + Slice block_contents; + std::string compression_output; + + block_contents = CompressBlock(block_builder.Finish(), compression_opts, + &compression, kBlockBasedTableVersionFormat, + Slice() /* dictionary */, &compression_output); + + char header[kBlockHeaderSize]; + char trailer[kBlockTrailerSize]; + trailer[0] = compression; + auto crc = crc32c::Value(block_contents.data(), block_contents.size()); + crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type + EncodeFixed32(trailer + 1, crc32c::Mask(crc)); + + BlockHandle handle; + std::string index_entry; + Status s; + { + InstrumentedMutexLock l(&mutex_); + auto raw_block_size = block_contents.size(); + EncodeFixed64(header, raw_block_size); + s = file_writer_->Append(Slice(header, kBlockHeaderSize)); + writer_offset_ += kBlockHeaderSize; + if (s.ok()) { + handle.set_offset(writer_offset_); + handle.set_size(raw_block_size); + s = file_writer_->Append(block_contents); + } + if (s.ok()) { + s = file_writer_->Append(Slice(trailer, kBlockTrailerSize)); + } + if (s.ok()) { + s = file_writer_->Flush(); + } + if (s.ok() && writer_offset_ > next_sync_offset_) { + // Sync every kBytesPerSync. This is a hacky way to limit unsynced data. + next_sync_offset_ += kBytesPerSync; + s = file_writer_->Sync(db_->GetOptions().use_fsync); + } + if (s.ok()) { + writer_offset_ += block_contents.size() + kBlockTrailerSize; + // Put file number + PutVarint64(&index_entry, 0); + handle.EncodeTo(&index_entry); + s = db_->Put(options, key, index_entry); + } + } + return s; +} + +Status BlobDB::Get(const ReadOptions& options, const Slice& key, + std::string* value) { + Status s; + std::string index_entry; + s = db_->Get(options, key, &index_entry); + if (!s.ok()) { + return s; + } + BlockHandle handle; + Slice index_entry_slice(index_entry); + uint64_t file_number; + if (!GetVarint64(&index_entry_slice, &file_number)) { + return Status::Corruption(); + } + assert(file_number == 0); + s = handle.DecodeFrom(&index_entry_slice); + if (!s.ok()) { + return s; + } + Footer footer(0, kBlockBasedTableVersionFormat); + BlockContents contents; + s = ReadBlockContents(file_reader_.get(), footer, options, handle, &contents, + ioptions_); + if (!s.ok()) { + return s; + } + Block block(std::move(contents)); + BlockIter bit; + InternalIterator* it = block.NewIterator(nullptr, &bit); + it->SeekToFirst(); + if (!it->status().ok()) { + return it->status(); + } + *value = it->value().ToString(); + return s; +} +} // namespace rocksdb +#else +namespace rocksdb { +Status NewBlobDB(Options options, std::string dbname, DB** blob_db) { + return Status::NotSupported(); +} +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h new file mode 100644 index 000000000..b57e3cfba --- /dev/null +++ b/utilities/blob_db/blob_db.h @@ -0,0 +1,22 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include "rocksdb/db.h" +#include "rocksdb/status.h" + +namespace rocksdb { +// EXPERIMENAL ONLY +// A wrapped database which puts values of KV pairs in a separate log +// and store location to the log in the underlying DB. +// It lacks lots of importatant functionalities, e.g. DB restarts, +// garbage collection, iterators, etc. +// +// The factory needs to be moved to include/rocksdb/utilities to allow +// users to use blob DB. +extern Status NewBlobDB(Options options, std::string dbname, DB** blob_db); +} // namespace rocksdb diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc new file mode 100644 index 000000000..4d26ef0e4 --- /dev/null +++ b/utilities/blob_db/blob_db_test.cc @@ -0,0 +1,80 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db.h" +#include "util/random.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { +class BlobDBTest : public testing::Test { + public: + BlobDBTest() { + dbname_ = test::TmpDir() + "/blob_db_test"; + Options options; + options.create_if_missing = true; + EXPECT_TRUE(NewBlobDB(options, dbname_, &db_).ok()); + } + + ~BlobDBTest() { delete db_; } + + DB* db_; + std::string dbname_; +}; // class BlobDBTest + +TEST_F(BlobDBTest, Basic) { + WriteOptions wo; + ReadOptions ro; + std::string value; + + ASSERT_OK(db_->Put(wo, "foo", "v1")); + ASSERT_OK(db_->Put(wo, "bar", "v2")); + + ASSERT_OK(db_->Get(ro, "foo", &value)); + ASSERT_EQ("v1", value); + ASSERT_OK(db_->Get(ro, "bar", &value)); + ASSERT_EQ("v2", value); +} + +TEST_F(BlobDBTest, Large) { + WriteOptions wo; + ReadOptions ro; + std::string value1, value2, value3; + Random rnd(301); + + value1.assign(8999, '1'); + ASSERT_OK(db_->Put(wo, "foo", value1)); + value2.assign(9001, '2'); + ASSERT_OK(db_->Put(wo, "bar", value2)); + test::RandomString(&rnd, 13333, &value3); + ASSERT_OK(db_->Put(wo, "barfoo", value3)); + + std::string value; + ASSERT_OK(db_->Get(ro, "foo", &value)); + ASSERT_EQ(value1, value); + ASSERT_OK(db_->Get(ro, "bar", &value)); + ASSERT_EQ(value2, value); + ASSERT_OK(db_->Get(ro, "barfoo", &value)); + ASSERT_EQ(value3, value); +} + +} // namespace rocksdb + +// A black-box test for the ttl wrapper around rocksdb +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE