DB to only flush the column family with the largest memtable while option.db_write_buffer_size is hit

Summary: When option.db_write_buffer_size is hit, we currently flush all column families. Move to flush the column family with the largest active memt table instead. In this way, we can avoid too many small files in some cases.

Test Plan: Modify test DBTest.SharedWriteBuffer to work with the updated behavior

Reviewers: kradhakrishnan, yhchiang, rven, anthony, IslamAbdelRahman, igor

Reviewed By: igor

Subscribers: march, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D51291
main
sdong 9 years ago
parent 4a009f9172
commit db320b1b82
  1. 2
      HISTORY.md
  2. 25
      db/db_impl.cc
  3. 82
      db/db_test.cc

@ -8,6 +8,8 @@
* Introduce CheckOptionsCompatibility() in rocksdb/utilities/options_util.h. This function checks whether the input set of options is able to open the specified DB successfully.
### Public API Changes
* When options.db_write_buffer_size triggers, only the column family with the largest column family size will be flushed, not all the column families.
## 4.2.0 (11/9/2015)
### New Features
* Introduce CreateLoggerFromOptions(), this function create a Logger for provided DBOptions.

@ -3920,26 +3920,37 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Flushing all column families. Write buffer is using %" PRIu64
" bytes out of a total of %" PRIu64 ".",
"Flushing column family with largest mem table size. Write buffer is "
"using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
write_buffer_.memory_usage(), write_buffer_.buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
ColumnFamilyData* largest_cfd = nullptr;
size_t largest_cfd_size = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) {
status = SwitchMemtable(cfd, &context);
if (!status.ok()) {
break;
// We only consider active mem table, hoping immutable memtable is
// already in the process of flushing.
size_t cfd_size = cfd->mem()->ApproximateMemoryUsage();
if (largest_cfd == nullptr || cfd_size > largest_cfd_size) {
largest_cfd = cfd;
largest_cfd_size = cfd_size;
}
cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd);
}
}
if (largest_cfd != nullptr) {
status = SwitchMemtable(largest_cfd, &context);
if (status.ok()) {
largest_cfd->imm()->FlushRequested();
SchedulePendingFlush(largest_cfd);
MaybeScheduleFlushOrCompaction();
}
}
}
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
status = bg_error_;

@ -4936,7 +4936,7 @@ TEST_F(DBTest, SharedWriteBuffer) {
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
// Trigger a flush on every CF
// Trigger a flush on CF "nikitich"
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
ASSERT_OK(Put(3, Key(1), DummyString(90000)));
@ -4948,65 +4948,107 @@ TEST_F(DBTest, SharedWriteBuffer) {
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// "dobrynia": 20KB
// Flush 'dobrynia'
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(70000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(1));
}
// Flush 'dobrynia' and 'nikitich'
ASSERT_OK(Put(2, Key(2), DummyString(50000)));
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(3), DummyString(20000)));
// "nikitich" still has has data of 80KB
// Inserting Data in "dobrynia" triggers "nikitich" flushing.
ASSERT_OK(Put(3, Key(2), DummyString(40000)));
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(2));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// Make 'dobrynia' and 'nikitich' both take up 40% of space
// When 'pikachu' puts us over 100%, all 3 flush.
ASSERT_OK(Put(2, Key(2), DummyString(40000)));
// "dobrynia" still has 40KB
ASSERT_OK(Put(1, Key(2), DummyString(20000)));
ASSERT_OK(Put(0, Key(1), DummyString(10000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers no flush
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
// "default": 10KB, "pikachu": 20KB, "dobrynia": 40KB
ASSERT_OK(Put(1, Key(2), DummyString(40000)));
ASSERT_OK(Put(0, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
// This should triggers flush of "pikachu"
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(0));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(3));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(3));
static_cast<uint64_t>(2));
}
// Some remaining writes so 'default' and 'nikitich' flush on closure.
// "default": 10KB, "dobrynia": 40KB
// Some remaining writes so 'default', 'dobrynia' and 'nikitich' flush on
// closure.
ASSERT_OK(Put(3, Key(1), DummyString(1)));
ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"},
options);
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(2));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"),
static_cast<uint64_t>(2));
static_cast<uint64_t>(1));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"),
static_cast<uint64_t>(3));
static_cast<uint64_t>(2));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(4));
static_cast<uint64_t>(3));
}
}
#endif // ROCKSDB_LITE

Loading…
Cancel
Save