Add options.write_buffer_manager: control total memtable size across DB instances

Summary: Add option write_buffer_manager to help users control total memory spent on memtables across multiple DB instances.

Test Plan: Add a new unit test.

Reviewers: yhchiang, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: adela, benj, sumeet, muthu, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D59925
main
sdong 8 years ago
parent 5aaef91d4a
commit 32df9733d1
  1. 1
      HISTORY.md
  2. 18
      db/column_family.cc
  3. 9
      db/column_family.h
  4. 10
      db/compaction_job_test.cc
  5. 20
      db/db_impl.cc
  6. 4
      db/db_impl.h
  7. 124
      db/db_test.cc
  8. 219
      db/db_test2.cc
  9. 4
      db/db_wal_test.cc
  10. 12
      db/flush_job_test.cc
  11. 7
      db/memtable.cc
  12. 4
      db/memtable.h
  13. 23
      db/memtable_allocator.cc
  14. 10
      db/memtable_allocator.h
  15. 22
      db/memtable_list_test.cc
  16. 4
      db/memtablerep_bench.cc
  17. 4
      db/repair.cc
  18. 12
      db/version_set.cc
  19. 5
      db/version_set.h
  20. 10
      db/wal_manager_test.cc
  21. 6
      db/write_batch_test.cc
  22. 19
      include/rocksdb/options.h
  23. 15
      include/rocksdb/write_buffer_manager.h
  24. 16
      java/rocksjni/write_batch.cc
  25. 6
      java/rocksjni/write_batch_test.cc
  26. 14
      table/table_test.cc
  27. 6
      tools/ldb_cmd.cc
  28. 1
      util/options.cc
  29. 2
      util/options_settable_test.cc

@ -9,6 +9,7 @@
* Add avoid_flush_during_recovery option.
* Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread.
* RepairDB support for column families. RepairDB now associates data with non-default column families using information embedded in the SST/WAL files (4.7 or later). For data written by 4.6 or earlier, RepairDB associates it with the default column family.
* Add options.write_buffer_manager which allows users to control total memtable sizes across multiple DB instances.
## 4.9.0 (6/9/2016)
### Public API changes

@ -26,7 +26,6 @@
#include "db/table_properties_collector.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "db/writebuffer.h"
#include "memtable/hash_skiplist_rep.h"
#include "util/autovector.h"
#include "util/compression.h"
@ -330,7 +329,7 @@ void SuperVersionUnrefHandle(void* ptr) {
ColumnFamilyData::ColumnFamilyData(
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBuffer* write_buffer,
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
const EnvOptions& env_options, ColumnFamilySet* column_family_set)
: id_(id),
@ -344,7 +343,7 @@ ColumnFamilyData::ColumnFamilyData(
SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_, ioptions_),
write_buffer_(write_buffer),
write_buffer_manager_(write_buffer_manager),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge,
options_.max_write_buffer_number_to_maintain),
@ -676,7 +675,7 @@ MemTable* ColumnFamilyData::ConstructNewMemtable(
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
assert(current_ != nullptr);
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
write_buffer_, earliest_seq);
write_buffer_manager_, earliest_seq);
}
void ColumnFamilyData::CreateNewMemtable(
@ -855,7 +854,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& env_options,
Cache* table_cache,
WriteBuffer* write_buffer,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
@ -866,7 +865,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
db_options_(db_options),
env_options_(env_options),
table_cache_(table_cache),
write_buffer_(write_buffer),
write_buffer_manager_(write_buffer_manager),
write_controller_(write_controller) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
@ -929,10 +928,9 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd =
new ColumnFamilyData(id, name, dummy_versions, table_cache_,
write_buffer_, options, db_options_,
env_options_, this);
ColumnFamilyData* new_cfd = new ColumnFamilyData(
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
db_options_, env_options_, this);
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
max_column_family_ = std::max(max_column_family_, id);

@ -323,7 +323,7 @@ class ColumnFamilyData {
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
WriteBuffer* write_buffer,
WriteBufferManager* write_buffer_manager,
const ColumnFamilyOptions& options,
const DBOptions* db_options, const EnvOptions& env_options,
ColumnFamilySet* column_family_set);
@ -348,7 +348,7 @@ class ColumnFamilyData {
std::unique_ptr<InternalStats> internal_stats_;
WriteBuffer* write_buffer_;
WriteBufferManager* write_buffer_manager_;
MemTable* mem_;
MemTableList imm_;
@ -438,7 +438,8 @@ class ColumnFamilySet {
ColumnFamilySet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteBuffer* write_buffer, WriteController* write_controller);
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
@ -495,7 +496,7 @@ class ColumnFamilySet {
const DBOptions* const db_options_;
const EnvOptions env_options_;
Cache* table_cache_;
WriteBuffer* write_buffer_;
WriteBufferManager* write_buffer_manager_;
WriteController* write_controller_;
};

@ -10,13 +10,13 @@
#include <string>
#include <tuple>
#include "db/compaction_job.h"
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
@ -70,9 +70,9 @@ class CompactionJobTest : public testing::Test {
dbname_(test::TmpDir() + "/compaction_job_test"),
mutable_cf_options_(Options(), ImmutableCFOptions(Options())),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_(db_options_.db_write_buffer_size),
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()) {
@ -285,7 +285,7 @@ class CompactionJobTest : public testing::Test {
WriteController write_controller_;
DBOptions db_options_;
ColumnFamilyOptions cf_options_;
WriteBuffer write_buffer_;
WriteBufferManager write_buffer_manager_;
std::unique_ptr<VersionSet> versions_;
InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;

@ -57,7 +57,6 @@
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/write_callback.h"
#include "db/writebuffer.h"
#include "db/xfunc_test_points.h"
#include "memtable/hash_linklist_rep.h"
#include "memtable/hash_skiplist_rep.h"
@ -73,6 +72,7 @@
#include "rocksdb/table.h"
#include "rocksdb/version.h"
#include "rocksdb/wal_filter.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
@ -147,6 +147,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.info_log = nullptr;
}
}
if (!result.write_buffer_manager) {
result.write_buffer_manager.reset(
new WriteBufferManager(result.db_write_buffer_size));
}
if (result.base_background_compactions == -1) {
result.base_background_compactions = result.max_background_compactions;
}
@ -315,7 +319,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
total_log_size_(0),
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_(options.db_write_buffer_size),
write_buffer_manager_(db_options_.write_buffer_manager.get()),
write_thread_(options.enable_write_thread_adaptive_yield
? options.write_thread_max_yield_usec
: 0,
@ -355,7 +359,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits);
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_,
table_cache_.get(), write_buffer_manager_,
&write_controller_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
@ -4420,11 +4424,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}
}
MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) {
} else if (UNLIKELY(write_buffer_manager_->ShouldFlush())) {
// Before a new memtable is added in SwitchMemtable(),
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
// thread is writing to another DB with the same write buffer, they may also
// be flushed. We may end up with flushing much more DBs than needed. It's
// suboptimal but still correct.
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Flushing column family with largest mem table size. Write buffer is "
"using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
write_buffer_.memory_usage(), write_buffer_.buffer_size());
write_buffer_manager_->memory_usage(),
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
ColumnFamilyData* largest_cfd = nullptr;

@ -31,13 +31,13 @@
#include "db/wal_manager.h"
#include "db/write_controller.h"
#include "db/write_thread.h"
#include "db/writebuffer.h"
#include "memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "util/autovector.h"
#include "util/event_logger.h"
@ -803,7 +803,7 @@ class DBImpl : public DB {
Directories directories_;
WriteBuffer write_buffer_;
WriteBufferManager* write_buffer_manager_;
WriteThread write_thread_;

@ -2285,130 +2285,6 @@ TEST_F(DBTest, FlushOneColumnFamily) {
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, SharedWriteBuffer) {
Options options = CurrentOptions();
options.db_write_buffer_size = 100000; // this is the real limit
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
// Trigger a flush on CF "nikitich"
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
ASSERT_OK(Put(2, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "dobrynia": 20KB
// Flush 'dobrynia'
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(70000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "nikitich" still has data of 80KB
// Inserting Data in "dobrynia" triggers "nikitich" flushing.
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "dobrynia" still has 40KB
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
ASSERT_OK(Put(0, Key(1), DummyString(10000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers no flush
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
ASSERT_OK(Put(1, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers flush of "pikachu"
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "dobrynia": 40KB
// Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
// closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(3));
}
}
#endif // ROCKSDB_LITE
TEST_F(DBTest, PurgeInfoLogs) {
Options options = CurrentOptions();
options.keep_log_file_num = 5;

@ -144,6 +144,225 @@ TEST_F(DBTest2, CacheIndexAndFilterWithDBRestart) {
}
#ifndef ROCKSDB_LITE
class DBTestSharedWriteBufferAcrossCFs
: public DBTestBase,
public testing::WithParamInterface<bool> {
public:
DBTestSharedWriteBufferAcrossCFs()
: DBTestBase("/db_test_shared_write_buffer") {}
void SetUp() override { use_old_interface_ = GetParam(); }
bool use_old_interface_;
};
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
Options options = CurrentOptions();
if (use_old_interface_) {
options.db_write_buffer_size = 100000; // this is the real limit
} else {
options.write_buffer_manager.reset(new WriteBufferManager(100000));
}
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
// Trigger a flush on CF "nikitich"
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
ASSERT_OK(Put(2, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "dobrynia": 20KB
// Flush 'dobrynia'
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(70000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "nikitich" still has data of 80KB
// Inserting Data in "dobrynia" triggers "nikitich" flushing.
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "dobrynia" still has 40KB
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
ASSERT_OK(Put(0, Key(1), DummyString(10000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers no flush
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
ASSERT_OK(Put(1, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers flush of "pikachu"
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "dobrynia": 40KB
// Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
// closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(3));
}
}
INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
DBTestSharedWriteBufferAcrossCFs, ::testing::Bool());
TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2";
Options options = CurrentOptions();
options.write_buffer_size = 500000; // this is never hit
options.write_buffer_manager.reset(new WriteBufferManager(100000));
CreateAndReopenWithCF({"cf1", "cf2"}, options);
ASSERT_OK(DestroyDB(dbname2, options));
DB* db2 = nullptr;
ASSERT_OK(DB::Open(options, dbname2, &db2));
WriteOptions wo;
// Trigger a flush on cf2
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(2, Key(1), DummyString(90000)));
// Insert to DB2
ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(0));
}
// db2: 20KB
ASSERT_OK(db2->Put(wo, Key(2), DummyString(40000)));
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(1));
}
//
// Inserting Data in db2 and db_ triggers flushing in db_.
ASSERT_OK(db2->Put(wo, Key(3), DummyString(70000)));
ASSERT_OK(Put(2, Key(2), DummyString(45000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf1"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "cf2"),
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db2, "default"),
static_cast<uint64_t>(1));
}
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
}
namespace {
void ValidateKeyExistence(DB* db, const std::vector<Slice>& keys_must_exist,
const std::vector<Slice>& keys_must_not_exist) {

@ -494,14 +494,14 @@ class RecoveryTestHelper {
shared_ptr<Cache> table_cache = NewLRUCache(50000, 16);
EnvOptions env_options;
WriteBuffer write_buffer(db_options.db_write_buffer_size);
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
unique_ptr<VersionSet> versions;
unique_ptr<WalManager> wal_manager;
WriteController write_controller;
versions.reset(new VersionSet(test->dbname_, &db_options, env_options,
table_cache.get(), &write_buffer,
table_cache.get(), &write_buffer_manager,
&write_controller));
wal_manager.reset(new WalManager(db_options, env_options));

@ -7,16 +7,16 @@
#include <map>
#include <string>
#include "db/flush_job.h"
#include "db/column_family.h"
#include "db/flush_job.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "rocksdb/cache.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/mock_table.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "table/mock_table.h"
namespace rocksdb {
@ -29,9 +29,9 @@ class FlushJobTest : public testing::Test {
: env_(Env::Default()),
dbname_(test::TmpDir() + "/flush_job_test"),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_(db_options_.db_write_buffer_size),
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_)),
shutting_down_(false),
mock_table_factory_(new mock::MockTableFactory()) {
@ -77,7 +77,7 @@ class FlushJobTest : public testing::Test {
std::shared_ptr<Cache> table_cache_;
WriteController write_controller_;
DBOptions db_options_;
WriteBuffer write_buffer_;
WriteBufferManager write_buffer_manager_;
ColumnFamilyOptions cf_options_;
std::unique_ptr<VersionSet> versions_;
InstrumentedMutex mutex_;

@ -17,12 +17,12 @@
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/writebuffer.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/internal_iterator.h"
#include "table/merger.h"
#include "util/arena.h"
@ -57,13 +57,14 @@ MemTableOptions::MemTableOptions(const ImmutableCFOptions& ioptions,
MemTable::MemTable(const InternalKeyComparator& cmp,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBuffer* write_buffer, SequenceNumber earliest_seq)
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq)
: comparator_(cmp),
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
arena_(moptions_.arena_block_size, 0),
allocator_(&arena_, write_buffer),
allocator_(&arena_, write_buffer_manager),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &allocator_, ioptions.prefix_extractor,
ioptions.info_log)),

@ -32,7 +32,6 @@ namespace rocksdb {
class Mutex;
class MemTableIterator;
class MergeContext;
class WriteBuffer;
class InternalIterator;
struct MemTableOptions {
@ -91,7 +90,8 @@ class MemTable {
explicit MemTable(const InternalKeyComparator& comparator,
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
WriteBuffer* write_buffer, SequenceNumber earliest_seq);
WriteBufferManager* write_buffer_manager,
SequenceNumber earliest_seq);
// Do not delete this MemTable unless Unref() indicates it not in use.
~MemTable();

@ -10,36 +10,39 @@
#include "db/memtable_allocator.h"
#include <assert.h>
#include "db/writebuffer.h"
#include "rocksdb/write_buffer_manager.h"
#include "util/arena.h"
namespace rocksdb {
MemTableAllocator::MemTableAllocator(Allocator* allocator,
WriteBuffer* write_buffer)
: allocator_(allocator), write_buffer_(write_buffer), bytes_allocated_(0) {}
WriteBufferManager* write_buffer_manager)
: allocator_(allocator),
write_buffer_manager_(write_buffer_manager),
bytes_allocated_(0) {}
MemTableAllocator::~MemTableAllocator() { DoneAllocating(); }
char* MemTableAllocator::Allocate(size_t bytes) {
assert(write_buffer_ != nullptr);
assert(write_buffer_manager_ != nullptr);
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_->ReserveMem(bytes);
write_buffer_manager_->ReserveMem(bytes);
return allocator_->Allocate(bytes);
}
char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size,
Logger* logger) {
assert(write_buffer_ != nullptr);
assert(write_buffer_manager_ != nullptr);
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_->ReserveMem(bytes);
write_buffer_manager_->ReserveMem(bytes);
return allocator_->AllocateAligned(bytes, huge_page_size, logger);
}
void MemTableAllocator::DoneAllocating() {
if (write_buffer_ != nullptr) {
write_buffer_->FreeMem(bytes_allocated_.load(std::memory_order_relaxed));
write_buffer_ = nullptr;
if (write_buffer_manager_ != nullptr) {
write_buffer_manager_->FreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
write_buffer_manager_ = nullptr;
}
}

@ -8,21 +8,23 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// This is used by the MemTable to allocate write buffer memory. It connects
// to WriteBuffer so we can track and enforce overall write buffer limits.
// to WriteBufferManager so we can track and enforce overall write buffer
// limits.
#pragma once
#include <atomic>
#include "rocksdb/write_buffer_manager.h"
#include "util/allocator.h"
namespace rocksdb {
class Logger;
class WriteBuffer;
class MemTableAllocator : public Allocator {
public:
explicit MemTableAllocator(Allocator* allocator, WriteBuffer* write_buffer);
explicit MemTableAllocator(Allocator* allocator,
WriteBufferManager* write_buffer_manager);
~MemTableAllocator();
// Allocator interface
@ -37,7 +39,7 @@ class MemTableAllocator : public Allocator {
private:
Allocator* allocator_;
WriteBuffer* write_buffer_;
WriteBufferManager* write_buffer_manager_;
std::atomic<size_t> bytes_allocated_;
// No copying allowed

@ -3,19 +3,19 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/memtable_list.h"
#include <algorithm>
#include <string>
#include <vector>
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "db/write_controller.h"
#include "db/writebuffer.h"
#include "rocksdb/db.h"
#include "rocksdb/status.h"
#include "util/testutil.h"
#include "rocksdb/write_buffer_manager.h"
#include "util/string_util.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
@ -59,12 +59,12 @@ class MemTableListTest : public testing::Test {
DBOptions db_options;
EnvOptions env_options;
shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
WriteBuffer write_buffer(db_options.db_write_buffer_size);
WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
WriteController write_controller(10000000u);
CreateDB();
VersionSet versions(dbname, &db_options, env_options, table_cache.get(),
&write_buffer, &write_controller);
&write_buffer_manager, &write_controller);
// Create mock default ColumnFamilyData
ColumnFamilyOptions cf_options;
@ -126,7 +126,7 @@ TEST_F(MemTableListTest, GetTest) {
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
WriteBuffer wb(options.db_write_buffer_size);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);
@ -163,7 +163,7 @@ TEST_F(MemTableListTest, GetTest) {
SequenceNumber saved_seq = seq;
// Create another memtable and write some keys to it
WriteBuffer wb2(options.db_write_buffer_size);
WriteBufferManager wb2(options.db_write_buffer_size);
MemTable* mem2 =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2,
kMaxSequenceNumber);
@ -228,7 +228,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
WriteBuffer wb(options.db_write_buffer_size);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);
@ -303,7 +303,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_EQ("value2.2", value);
// Create another memtable and write some keys to it
WriteBuffer wb2(options.db_write_buffer_size);
WriteBufferManager wb2(options.db_write_buffer_size);
MemTable* mem2 =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb2,
kMaxSequenceNumber);
@ -329,7 +329,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
ASSERT_EQ(0, to_delete.size());
// Add a third memtable to push the first memtable out of the history
WriteBuffer wb3(options.db_write_buffer_size);
WriteBufferManager wb3(options.db_write_buffer_size);
MemTable* mem3 =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb3,
kMaxSequenceNumber);
@ -390,7 +390,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBuffer wb(options.db_write_buffer_size);
WriteBufferManager wb(options.db_write_buffer_size);
autovector<MemTable*> to_delete;
// Create MemTableList

@ -28,13 +28,13 @@ int main() {
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/writebuffer.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/write_buffer_manager.h"
#include "util/arena.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
@ -622,7 +622,7 @@ int main(int argc, char** argv) {
rocksdb::BytewiseComparator());
rocksdb::MemTable::KeyComparator key_comp(internal_key_comp);
rocksdb::Arena arena;
rocksdb::WriteBuffer wb(FLAGS_write_buffer_size);
rocksdb::WriteBufferManager wb(FLAGS_write_buffer_size);
rocksdb::MemTableAllocator memtable_allocator(&arena, &wb);
uint64_t sequence;
auto createMemtableRep = [&] {

@ -75,12 +75,12 @@
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/options.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
@ -229,7 +229,7 @@ class Repairer {
const bool create_unknown_cfs_;
std::shared_ptr<Cache> raw_table_cache_;
TableCache* table_cache_;
WriteBuffer wb_;
WriteBufferManager wb_;
WriteController wc_;
VersionSet vset_;
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;

@ -33,9 +33,9 @@
#include "db/merge_helper.h"
#include "db/table_cache.h"
#include "db/version_builder.h"
#include "db/writebuffer.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/format.h"
#include "table/get_context.h"
#include "table/internal_iterator.h"
@ -2086,11 +2086,11 @@ struct VersionSet::ManifestWriter {
VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& storage_options, Cache* table_cache,
WriteBuffer* write_buffer,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
: column_family_set_(new ColumnFamilySet(
dbname, db_options, storage_options, table_cache,
write_buffer, write_controller)),
: column_family_set_(
new ColumnFamilySet(dbname, db_options, storage_options, table_cache,
write_buffer_manager, write_controller)),
env_(db_options->env),
dbname_(dbname),
db_options_(db_options),
@ -2837,7 +2837,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
options->table_cache_numshardbits));
WriteController wc(options->delayed_write_rate);
WriteBuffer wb(options->db_write_buffer_size);
WriteBufferManager wb(options->db_write_buffer_size);
VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc);
Status status;

@ -55,7 +55,7 @@ class LookupKey;
class MemTable;
class Version;
class VersionSet;
class WriteBuffer;
class WriteBufferManager;
class MergeContext;
class ColumnFamilySet;
class TableCache;
@ -575,7 +575,8 @@ class VersionSet {
public:
VersionSet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& env_options, Cache* table_cache,
WriteBuffer* write_buffer, WriteController* write_controller);
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
~VersionSet();
// Apply *edit to the current version to form a new descriptor that

@ -10,12 +10,12 @@
#include "rocksdb/cache.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "db/wal_manager.h"
#include "db/log_writer.h"
#include "db/column_family.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
#include "util/file_reader_writer.h"
#include "util/mock_env.h"
#include "util/string_util.h"
@ -34,7 +34,7 @@ class WalManagerTest : public testing::Test {
: env_(new MockEnv(Env::Default())),
dbname_(test::TmpDir() + "/wal_manager_test"),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_(db_options_.db_write_buffer_size),
write_buffer_manager_(db_options_.db_write_buffer_size),
current_log_number_(0) {
DestroyDB(dbname_, Options());
}
@ -48,7 +48,7 @@ class WalManagerTest : public testing::Test {
db_options_.env = env_.get();
versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_buffer_,
table_cache_.get(), &write_buffer_manager_,
&write_controller_));
wal_manager_.reset(new WalManager(db_options_, env_options_));
@ -104,7 +104,7 @@ class WalManagerTest : public testing::Test {
EnvOptions env_options_;
std::shared_ptr<Cache> table_cache_;
DBOptions db_options_;
WriteBuffer write_buffer_;
WriteBufferManager write_buffer_manager_;
std::unique_ptr<VersionSet> versions_;
std::unique_ptr<WalManager> wal_manager_;
@ -128,7 +128,7 @@ TEST_F(WalManagerTest, ReadFirstRecordCache) {
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), EnvOptions()));
log::Writer writer(std::move(file_writer), 1,
db_options_.recycle_log_file_num > 0);
db_options_.recycle_log_file_num > 0);
WriteBatch batch;
batch.Put("foo", "bar");
WriteBatchInternal::SetSequence(&batch, 10);

@ -10,13 +10,13 @@
#include "rocksdb/db.h"
#include <memory>
#include "db/memtable.h"
#include "db/column_family.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "util/logging.h"
#include "util/string_util.h"
@ -30,7 +30,7 @@ static std::string PrintContents(WriteBatch* b) {
Options options;
options.memtable_factory = factory;
ImmutableCFOptions ioptions(options);
WriteBuffer wb(options.db_write_buffer_size);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* mem =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);

@ -17,9 +17,10 @@
#include <limits>
#include <unordered_map>
#include "rocksdb/version.h"
#include "rocksdb/listener.h"
#include "rocksdb/universal_compaction.h"
#include "rocksdb/version.h"
#include "rocksdb/write_buffer_manager.h"
#ifdef max
#undef max
@ -1138,6 +1139,22 @@ struct DBOptions {
// Default: 0 (disabled)
size_t db_write_buffer_size;
// The memory usage of memtable will report to this object. The same object
// can be passed into multiple DBs and it will track the sum of size of all
// the DBs. If the total size of all live memtables of all the DBs exceeds
// a limit, a flush will be triggered in the next DB to which the next write
// is issued.
//
// If the object is only passed to on DB, the behavior is the same as
// db_write_buffer_size. When write_buffer_manager is set, the value set will
// override db_write_buffer_size.
//
// This feature is disabled by default. Specify a non-zero value
// to enable it.
//
// Default: null
std::shared_ptr<WriteBufferManager> write_buffer_manager;
// Specify the file access pattern once a compaction is started.
// It will be applied to all input files of a compaction.
// Default: NORMAL

@ -7,20 +7,22 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBuffer is for managing memory allocation for one or more MemTables.
// WriteBufferManager is for managing memory allocation for one or more
// MemTables.
#pragma once
#include <atomic>
#include <cstddef>
namespace rocksdb {
class WriteBuffer {
class WriteBufferManager {
public:
explicit WriteBuffer(size_t _buffer_size)
explicit WriteBufferManager(size_t _buffer_size)
: buffer_size_(_buffer_size), memory_used_(0) {}
~WriteBuffer() {}
~WriteBufferManager() {}
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
@ -45,8 +47,7 @@ class WriteBuffer {
std::atomic<size_t> memory_used_;
// No copying allowed
WriteBuffer(const WriteBuffer&);
void operator=(const WriteBuffer&);
WriteBufferManager(const WriteBufferManager&) = delete;
WriteBufferManager& operator=(const WriteBufferManager&) = delete;
};
} // namespace rocksdb

@ -7,19 +7,19 @@
// calling c++ rocksdb::WriteBatch methods from Java side.
#include <memory>
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "include/org_rocksdb_WriteBatch.h"
#include "include/org_rocksdb_WriteBatch_Handler.h"
#include "rocksjni/portal.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
#include "rocksdb/db.h"
#include "rocksdb/immutable_options.h"
#include "db/memtable.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/status.h"
#include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "rocksdb/env.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "rocksjni/portal.h"
#include "rocksjni/writebatchhandlerjnicallback.h"
#include "table/scoped_arena_iterator.h"
#include "util/logging.h"
#include "util/testharness.h"

@ -9,17 +9,17 @@
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "include/org_rocksdb_WriteBatch.h"
#include "include/org_rocksdb_WriteBatch_Handler.h"
#include "include/org_rocksdb_WriteBatchTest.h"
#include "include/org_rocksdb_WriteBatchTestInternalHelper.h"
#include "include/org_rocksdb_WriteBatch_Handler.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "rocksjni/portal.h"
#include "table/scoped_arena_iterator.h"
#include "util/logging.h"
@ -42,7 +42,7 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(
rocksdb::InternalKeyComparator cmp(rocksdb::BytewiseComparator());
auto factory = std::make_shared<rocksdb::SkipListFactory>();
rocksdb::Options options;
rocksdb::WriteBuffer wb(options.db_write_buffer_size);
rocksdb::WriteBufferManager wb(options.db_write_buffer_size);
options.memtable_factory = factory;
rocksdb::MemTable* mem = new rocksdb::MemTable(
cmp, rocksdb::ImmutableCFOptions(options),

@ -20,7 +20,6 @@
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "memtable/stl_wrappers.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
@ -30,6 +29,7 @@
#include "rocksdb/perf_context.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/statistics.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/block.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_factory.h"
@ -400,10 +400,10 @@ uint64_t TableConstructor::cur_uniq_id_ = 1;
class MemTableConstructor: public Constructor {
public:
explicit MemTableConstructor(const Comparator* cmp, WriteBuffer* wb)
explicit MemTableConstructor(const Comparator* cmp, WriteBufferManager* wb)
: Constructor(cmp),
internal_comparator_(cmp),
write_buffer_(wb),
write_buffer_manager_(wb),
table_factory_(new SkipListFactory) {
options_.memtable_factory = table_factory_;
ImmutableCFOptions ioptions(options_);
@ -423,7 +423,7 @@ class MemTableConstructor: public Constructor {
ImmutableCFOptions mem_ioptions(ioptions);
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
MutableCFOptions(options_, mem_ioptions),
write_buffer_, kMaxSequenceNumber);
write_buffer_manager_, kMaxSequenceNumber);
memtable_->Ref();
int seq = 1;
for (const auto kv : kv_map) {
@ -445,7 +445,7 @@ class MemTableConstructor: public Constructor {
mutable Arena arena_;
InternalKeyComparator internal_comparator_;
Options options_;
WriteBuffer* write_buffer_;
WriteBufferManager* write_buffer_manager_;
MemTable* memtable_;
std::shared_ptr<SkipListFactory> table_factory_;
};
@ -941,7 +941,7 @@ class HarnessTest : public testing::Test {
ImmutableCFOptions ioptions_;
BlockBasedTableOptions table_options_ = BlockBasedTableOptions();
Constructor* constructor_;
WriteBuffer write_buffer_;
WriteBufferManager write_buffer_;
bool support_prev_;
bool only_support_prefix_seek_;
shared_ptr<InternalKeyComparator> internal_comparator_;
@ -2237,7 +2237,7 @@ TEST_F(MemTableTest, Simple) {
Options options;
options.memtable_factory = table_factory;
ImmutableCFOptions ioptions(options);
WriteBuffer wb(options.db_write_buffer_size);
WriteBufferManager wb(options.db_write_buffer_size);
MemTable* memtable =
new MemTable(cmp, ioptions, MutableCFOptions(options, ioptions), &wb,
kMaxSequenceNumber);

@ -17,11 +17,11 @@
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/write_batch_internal.h"
#include "db/writebuffer.h"
#include "port/dirent.h"
#include "rocksdb/cache.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
#include "tools/ldb_cmd_impl.h"
#include "tools/sst_dump_tool_imp.h"
@ -868,7 +868,7 @@ void DumpManifestFile(std::string file, bool verbose, bool hex, bool json) {
options.db_paths.emplace_back("dummy", 0);
options.num_levels = 64;
WriteController wc(options.delayed_write_rate);
WriteBuffer wb(options.db_write_buffer_size);
WriteBufferManager wb(options.db_write_buffer_size);
VersionSet versions(dbname, &options, sopt, tc.get(), &wb, &wc);
Status s = versions.DumpManifest(options, file, verbose, hex, json);
if (!s.ok()) {
@ -1578,7 +1578,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits));
const InternalKeyComparator cmp(opt.comparator);
WriteController wc(opt.delayed_write_rate);
WriteBuffer wb(opt.db_write_buffer_size);
WriteBufferManager wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &opt, soptions, tc.get(), &wb, &wc);
std::vector<ColumnFamilyDescriptor> dummy;
ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,

@ -318,6 +318,7 @@ DBOptions::DBOptions(const Options& options)
stats_dump_period_sec(options.stats_dump_period_sec),
advise_random_on_open(options.advise_random_on_open),
db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
new_table_reader_for_compaction_inputs(
options.new_table_reader_for_compaction_inputs),

@ -197,6 +197,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
{offsetof(struct DBOptions, db_paths), sizeof(std::vector<DbPath>)},
{offsetof(struct DBOptions, db_log_dir), sizeof(std::string)},
{offsetof(struct DBOptions, wal_dir), sizeof(std::string)},
{offsetof(struct DBOptions, write_buffer_manager),
sizeof(std::shared_ptr<WriteBufferManager>)},
{offsetof(struct DBOptions, listeners),
sizeof(std::vector<std::shared_ptr<EventListener>>)},
{offsetof(struct DBOptions, row_cache), sizeof(std::shared_ptr<Cache>)},

Loading…
Cancel
Save