Respect cutoff timestamp during flush (#11599)

Summary:
Make flush respect the cutoff timestamp `full_history_ts_low` as much as possible for the user-defined timestamps in Memtables only feature. We achieve this by not proceeding with the actual flushing but instead reschedule the same `FlushRequest` so a follow up flush job can continue with the check after some interval.

This approach doesn't work well for atomic flush, so this feature currently is not supported in combination with atomic flush. Furthermore, this approach also requires a customized method to get the next immediately bigger user-defined timestamp. So currently it's limited to comparator that use uint64_t as the user-defined timestamp format. This support can be extended when we add such a customized method to `AdvancedColumnFamilyOptions`.

For non atomic flush request, at any single time, a column family can only have as many as one FlushRequest for it in the `flush_queue_`. There is deduplication done at `FlushRequest` enqueueing(`SchedulePendingFlush`) and dequeueing time (`PopFirstFromFlushQueue`). We hold the db mutex between when a `FlushRequest` is popped from the queue and the same FlushRequest get rescheduled, so no other `FlushRequest` with a higher `max_memtable_id` can be added to the `flush_queue_` blocking us from re-enqueueing the same `FlushRequest`.

Flush is continued nevertheless if there is risk of entering write stall mode had the flush being postponed, e.g. due to accumulation of write buffers, exceeding the `max_write_buffer_number` setting. When this happens, the newest user-defined timestamp in the involved Memtables need to be tracked and we use it to increase the `full_history_ts_low`, which is an inclusive cutoff timestamp for which RocksDB promises to keep all user-defined timestamps equal to and newer than it.

Tet plan:
```
./column_family_test --gtest_filter="*RetainUDT*"
./memtable_list_test --gtest_filter="*WithTimestamp*"
./flush_job_test --gtest_filter="*WithTimestamp*"
```

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11599

Reviewed By: ajkr

Differential Revision: D47561586

Pulled By: jowlyzhang

fbshipit-source-id: 9400445f983dd6eac489e9dd0fb5d9b99637fe89
oxigraph-main
Yu Zhang 1 year ago committed by Facebook GitHub Bot
parent 5c2a063c49
commit 4ea7b796b7
  1. 64
      db/column_family.cc
  2. 6
      db/column_family.h
  3. 203
      db/column_family_test.cc
  4. 6
      db/db_compaction_test.cc
  5. 11
      db/db_impl/db_impl.h
  6. 153
      db/db_impl/db_impl_compaction_flush.cc
  7. 64
      db/db_wal_test.cc
  8. 13
      db/db_with_timestamp_basic_test.cc
  9. 61
      db/flush_job.cc
  10. 18
      db/flush_job.h
  11. 15
      db/flush_job_test.cc
  12. 44
      db/memtable.cc
  13. 27
      db/memtable.h
  14. 19
      db/memtable_list.h
  15. 90
      db/memtable_list_test.cc
  16. 13
      db/repair_test.cc
  17. 16
      include/rocksdb/advanced_options.h

@ -1376,6 +1376,33 @@ Status ColumnFamilyData::ValidateOptions(
}
}
const auto* ucmp = cf_options.comparator;
assert(ucmp);
if (ucmp->timestamp_size() > 0 &&
!cf_options.persist_user_defined_timestamps) {
if (db_options.atomic_flush) {
return Status::NotSupported(
"Not persisting user-defined timestamps feature is not supported"
"in combination with atomic flush.");
}
if (db_options.allow_concurrent_memtable_write) {
return Status::NotSupported(
"Not persisting user-defined timestamps feature is not supported"
" in combination with concurrent memtable write.");
}
const char* comparator_name = cf_options.comparator->Name();
size_t name_size = strlen(comparator_name);
const char* suffix = ".u64ts";
size_t suffix_size = strlen(suffix);
if (name_size <= suffix_size ||
strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
return Status::NotSupported(
"Not persisting user-defined timestamps"
"feature only support user-defined timestamps formatted as "
"uint64_t.");
}
}
if (cf_options.enable_blob_garbage_collection) {
if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
cf_options.blob_garbage_collection_age_cutoff > 1.0) {
@ -1515,6 +1542,43 @@ FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
return data_dirs_[path_id].get();
}
bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT(
uint64_t max_memtable_id) {
const Comparator* ucmp = user_comparator();
const size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
return false;
}
// If users set the `persist_user_defined_timestamps` flag to false, they
// should also set the `full_history_ts_low` flag to indicate the range of
// user-defined timestamps to retain in memory. Otherwise, we do not
// explicitly postpone flush to retain UDTs.
const std::string& full_history_ts_low = GetFullHistoryTsLow();
if (full_history_ts_low.empty()) {
return false;
}
#ifndef NDEBUG
Slice last_table_newest_udt;
#endif /* !NDEBUG */
for (const Slice& table_newest_udt :
imm()->GetTablesNewestUDT(max_memtable_id)) {
assert(table_newest_udt.size() == full_history_ts_low.size());
assert(last_table_newest_udt.empty() ||
ucmp->CompareTimestamp(table_newest_udt, last_table_newest_udt) >=
0);
// Checking the newest UDT contained in MemTable with ascending ID up to
// `max_memtable_id`. MemTable with bigger ID will have newer UDT, return
// immediately on finding the first MemTable that needs postponing.
if (ucmp->CompareTimestamp(table_newest_udt, full_history_ts_low) >= 0) {
return true;
}
#ifndef NDEBUG
last_table_newest_udt = table_newest_udt;
#endif /* !NDEBUG */
}
return false;
}
void ColumnFamilyData::RecoverEpochNumbers() {
assert(current_);
auto* vstorage = current_->storage_info();

@ -506,6 +506,12 @@ class ColumnFamilyData {
return full_history_ts_low_;
}
// REQUIRES: DB mutex held.
// Return true if flushing up to MemTables with ID `max_memtable_id`
// should be postponed to retain user-defined timestamps according to the
// user's setting. Called by background flush job.
bool ShouldPostponeFlushToRetainUDT(uint64_t max_memtable_id);
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
std::shared_ptr<CacheReservationManager>

@ -17,6 +17,7 @@
#include "options/options_parser.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
@ -63,6 +64,9 @@ class ColumnFamilyTestBase : public testing::Test {
db_options_.create_if_missing = true;
db_options_.fail_if_options_file_error = true;
db_options_.env = env_;
}
void SetUp() override {
EXPECT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
}
@ -3380,6 +3384,205 @@ TEST(ColumnFamilyTest, ValidateMemtableKVChecksumOption) {
ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options));
}
// Tests the flushing behavior of a column family to retain user-defined
// timestamp when `persist_user_defined_timestamp` is false.
class ColumnFamilyRetainUDTTest : public ColumnFamilyTestBase {
public:
ColumnFamilyRetainUDTTest() : ColumnFamilyTestBase(kLatestFormatVersion) {}
void SetUp() override {
db_options_.allow_concurrent_memtable_write = false;
column_family_options_.comparator =
test::BytewiseComparatorWithU64TsWrapper();
column_family_options_.persist_user_defined_timestamps = false;
ColumnFamilyTestBase::SetUp();
}
Status Put(int cf, const std::string& key, const std::string& ts,
const std::string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(ts),
Slice(value));
}
};
class TestTsComparator : public Comparator {
public:
TestTsComparator() : Comparator(8 /*ts_sz*/) {}
int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
return 0;
}
const char* Name() const override { return "TestTs"; }
void FindShortestSeparator(
std::string* /*start*/,
const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
void FindShortSuccessor(std::string* /*key*/) const override {}
};
TEST_F(ColumnFamilyRetainUDTTest, SanityCheck) {
Open();
ColumnFamilyOptions cf_options;
cf_options.persist_user_defined_timestamps = false;
TestTsComparator test_comparator;
cf_options.comparator = &test_comparator;
ColumnFamilyHandle* handle;
// Not persisting user-defined timestamps feature only supports user-defined
// timestamps formatted as uint64_t.
ASSERT_TRUE(
db_->CreateColumnFamily(cf_options, "pikachu", &handle).IsNotSupported());
Destroy();
// Not persisting user-defined timestamps feature doesn't work in combination
// with atomic flush.
db_options_.atomic_flush = true;
ASSERT_TRUE(TryOpen({"default"}).IsNotSupported());
// Not persisting user-defined timestamps feature doesn't work in combination
// with concurrent memtable write.
db_options_.atomic_flush = false;
db_options_.allow_concurrent_memtable_write = true;
ASSERT_TRUE(TryOpen({"default"}).IsNotSupported());
Close();
}
TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));
// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));
// `full_history_ts_low` stays unchanged after flush.
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
ASSERT_OK(Flush(0));
// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(2, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
ASSERT_OK(Flush(0));
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -5231,6 +5231,12 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
}
auto manual_compaction_thread = port::Thread([this]() {
// Write something to make the current Memtable non-empty, so an extra
// immutable Memtable will be created upon manual flush requested by
// CompactRange, triggering a write stall mode to be entered because of
// accumulation of write buffers due to manual flush.
Random compact_rnd(301);
ASSERT_OK(Put(Key(0), compact_rnd.RandomString(1024)));
CompactRangeOptions cro;
cro.allow_write_stall = false;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));

@ -2063,6 +2063,10 @@ class DBImpl : public DB {
// flush is considered complete.
std::unordered_map<ColumnFamilyData*, uint64_t>
cfd_to_max_mem_id_to_persist;
#ifndef NDEBUG
int reschedule_count = 1;
#endif /* !NDEBUG */
};
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
@ -2091,6 +2095,7 @@ class DBImpl : public DB {
Env::Priority thread_pri);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
bool* flush_rescheduled_to_retain_udt,
Env::Priority thread_pri);
bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
@ -2103,6 +2108,12 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer);
// Return true if the `FlushRequest` can be rescheduled to retain the UDT.
// Only true if there are user-defined timestamps in the involved MemTables
// with newer than cutoff timestamp `full_history_ts_low` and not flushing
// immediately will not cause entering write stall mode.
bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req);
// Schedule background tasks
Status StartPeriodicTaskScheduler();

@ -21,6 +21,7 @@
#include "monitoring/thread_status_util.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/concurrent_task_limiter_impl.h"
namespace ROCKSDB_NAMESPACE {
@ -76,6 +77,40 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
return false;
}
bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
const FlushRequest& flush_req) {
mutex_.AssertHeld();
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd = flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
uint64_t max_memtable_id =
flush_req.cfd_to_max_mem_id_to_persist.begin()->second;
if (cfd->IsDropped() ||
!cfd->ShouldPostponeFlushToRetainUDT(max_memtable_id)) {
return false;
}
// Check if holding on the flush will cause entering write stall mode.
// Write stall entered because of the accumulation of write buffers can be
// alleviated if we continue with the flush instead of postponing it.
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
// Taking the status of the active Memtable into consideration so that we are
// not just checking if DB is currently already in write stall mode.
int mem_to_flush = cfd->mem()->ApproximateMemoryUsageFast() >=
cfd->mem()->write_buffer_size() / 2
? 1
: 0;
WriteStallCondition write_stall =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + mem_to_flush, /*num_l0_files=*/0,
/*num_compaction_needed_bytes=*/0, mutable_cf_options,
*cfd->ioptions())
.first;
if (write_stall != WriteStallCondition::kNormal) {
return false;
}
return true;
}
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
VersionEdit* synced_wals) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
@ -2506,8 +2541,11 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
// check whether one extra immutable memtable or an extra L0 file would
// cause write stalling mode to be entered. It could still enter stall
// mode due to pending compaction bytes, but that's less common
// No extra immutable Memtable will be created if the current Memtable is
// empty.
int mem_to_flush = cfd->mem()->IsEmpty() ? 0 : 1;
write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed() + 1,
cfd->imm()->NumNotFlushed() + mem_to_flush,
vstorage->l0_delay_trigger_count() + 1,
vstorage->estimated_compaction_needed_bytes(),
mutable_cf_options, *cfd->ioptions())
@ -2945,6 +2983,7 @@ void DBImpl::UnscheduleFlushCallback(void* arg) {
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
bool* flush_rescheduled_to_retain_udt,
Env::Priority thread_pri) {
mutex_.AssertHeld();
@ -2970,12 +3009,43 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
autovector<ColumnFamilyData*> column_families_not_to_flush;
while (!flush_queue_.empty()) {
// This cfd is already referenced
auto [flush_reason, cfd_to_max_mem_id_to_persist] =
PopFirstFromFlushQueue();
FlushRequest flush_req = PopFirstFromFlushQueue();
FlushReason flush_reason = flush_req.flush_reason;
if (!immutable_db_options_.atomic_flush &&
ShouldRescheduleFlushRequestToRetainUDT(flush_req)) {
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd =
flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
if (cfd->UnrefAndTryDelete()) {
return Status::OK();
}
ROCKS_LOG_BUFFER(log_buffer,
"FlushRequest for column family %s is re-scheduled to "
"retain user-defined timestamps.",
cfd->GetName().c_str());
// Reschedule the `FlushRequest` as is without checking dropped column
// family etc. The follow-up job will do the check anyways, so save the
// duplication. Column family is deduplicated by `SchdulePendingFlush` and
// `PopFirstFromFlushQueue` contains at flush request enqueueing and
// dequeueing time.
// This flush request is rescheduled right after it's popped from the
// queue while the db mutex is held, so there should be no other
// FlushRequest for the same column family with higher `max_memtable_id`
// in the queue to block the reschedule from succeeding.
#ifndef NDEBUG
flush_req.reschedule_count += 1;
#endif /* !NDEBUG */
SchedulePendingFlush(flush_req);
*reason = flush_reason;
*flush_rescheduled_to_retain_udt = true;
return Status::TryAgain();
}
superversion_contexts.clear();
superversion_contexts.reserve(cfd_to_max_mem_id_to_persist.size());
superversion_contexts.reserve(
flush_req.cfd_to_max_mem_id_to_persist.size());
for (const auto& [cfd, max_memtable_id] : cfd_to_max_mem_id_to_persist) {
for (const auto& [cfd, max_memtable_id] :
flush_req.cfd_to_max_mem_id_to_persist) {
if (cfd->GetMempurgeUsed()) {
// If imm() contains silent memtables (e.g.: because
// MemPurge was activated), requesting a flush will
@ -2992,7 +3062,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
bg_flush_args.emplace_back(cfd, max_memtable_id,
&(superversion_contexts.back()), flush_reason);
}
if (!bg_flush_args.empty()) {
// `MaybeScheduleFlushOrCompaction` schedules as many `BackgroundCallFlush`
// jobs as the number of `FlushRequest` in the `flush_queue_`, a.k.a
// `unscheduled_flushes_`. So it's sufficient to make each `BackgroundFlush`
// handle one `FlushRequest` and each have a Status returned.
if (!bg_flush_args.empty() || !column_families_not_to_flush.empty()) {
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundFlush:CheckFlushRequest:cb",
const_cast<int*>(&flush_req.reschedule_count));
break;
}
}
@ -3054,11 +3130,20 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
FlushReason reason;
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
&reason, thread_pri);
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
reason != FlushReason::kErrorRecovery) {
bool flush_rescheduled_to_retain_udt = false;
Status s =
BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason,
&flush_rescheduled_to_retain_udt, thread_pri);
if (s.IsTryAgain() && flush_rescheduled_to_retain_udt) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
TEST_SYNC_POINT_CALLBACK("DBImpl::AfterRetainUDTReschedule:cb", nullptr);
immutable_db_options_.clock->SleepForMicroseconds(
100000); // prevent hot loop
mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped() &&
reason != FlushReason::kErrorRecovery) {
// Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to
// chew up resources for failed flushes for the duration of
@ -3079,29 +3164,33 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// If flush failed, we want to delete all temporary files that we might have
// created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped());
// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
// Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
// There is no need to do these clean up if the flush job is rescheduled
// to retain user-defined timestamps because the job doesn't get to the
// stage of actually flushing the MemTables.
if (!flush_rescheduled_to_retain_udt) {
// If flush failed, we want to delete all temporary files that we might
// have created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
!s.IsColumnFamilyDropped());
// delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
// Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
mutex_.Lock();
}
job_context.Clean();
mutex_.Lock();
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
}
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
assert(num_running_flushes_ > 0);
num_running_flushes_--;

@ -324,10 +324,14 @@ class DBWALTestWithTimestamp
}
Status CreateAndReopenWithCFWithTs(const std::vector<std::string>& cfs,
const Options& options,
Options& ts_options,
bool avoid_flush_during_recovery = false) {
CreateColumnFamilies(cfs, options);
return ReopenColumnFamiliesWithTs(cfs, options,
Options default_options = CurrentOptions();
default_options.allow_concurrent_memtable_write =
persist_udt_ ? true : false;
DestroyAndReopen(default_options);
CreateColumnFamilies(cfs, ts_options);
return ReopenColumnFamiliesWithTs(cfs, ts_options,
avoid_flush_during_recovery);
}
@ -336,6 +340,8 @@ class DBWALTestWithTimestamp
bool avoid_flush_during_recovery = false) {
Options default_options = CurrentOptions();
default_options.create_if_missing = false;
default_options.allow_concurrent_memtable_write =
persist_udt_ ? true : false;
default_options.avoid_flush_during_recovery = avoid_flush_during_recovery;
ts_options.create_if_missing = false;
@ -370,12 +376,11 @@ class DBWALTestWithTimestamp
TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
// Set up the option that enables user defined timestmp size.
std::string ts1 = Timestamp(1, 0);
const size_t kTimestampSize = ts1.size();
TestComparator test_cmp(kTimestampSize);
std::string ts1;
PutFixed64(&ts1, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
// Test that user-defined timestamps are recovered from WAL regardless of
// the value of this flag because UDTs are saved in WAL nonetheless.
// We however need to explicitly disable flush during recovery by setting
@ -405,14 +410,16 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
// Write more value versions for key "foo" and "bar" before and after second
// reopen.
std::string ts2 = Timestamp(2, 0);
std::string ts2;
PutFixed64(&ts2, 2);
ASSERT_OK(Put(1, "bar", ts2, "v2"));
ASSERT_OK(Put(1, "foo", ts2, "v3"));
ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options,
avoid_flush_during_recovery));
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U);
std::string ts3 = Timestamp(3, 0);
std::string ts3;
PutFixed64(&ts3, 3);
ASSERT_OK(Put(1, "foo", ts3, "v4"));
// Do a timestamped read with ts1 after third reopen.
@ -435,11 +442,26 @@ TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) {
} while (ChangeWalOptions());
}
class TestTsSzComparator : public Comparator {
public:
explicit TestTsSzComparator(size_t ts_sz) : Comparator(ts_sz) {}
int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
return 0;
}
const char* Name() const override { return "TestTsSzComparator.u64ts"; }
void FindShortestSeparator(
std::string* /*start*/,
const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
void FindShortSuccessor(std::string* /*key*/) const override {}
};
TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
// Set up the option that enables user defined timestmp size.
std::string ts = Timestamp(1, 0);
const size_t kTimestampSize = ts.size();
TestComparator test_cmp(kTimestampSize);
std::string ts;
PutFixed16(&ts, 1);
TestTsSzComparator test_cmp(2);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
@ -452,11 +474,11 @@ TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
// In real use cases, switching to a different user comparator is prohibited
// by a sanity check during DB open that does a user comparator name
// comparison. This test mocked and bypassed that sanity check because the
// before and after user comparator are both named "TestComparator". This is
// to test the user-defined timestamp recovery logic for WAL files have
// the intended consistency check.
// before and after user comparator are both named "TestTsSzComparator.u64ts".
// This is to test the user-defined timestamp recovery logic for WAL files
// have the intended consistency check.
// `HandleWriteBatchTimestampSizeDifference` in udt_util.h has more details.
TestComparator diff_test_cmp(kTimestampSize + 1);
TestTsSzComparator diff_test_cmp(3);
ts_options.comparator = &diff_test_cmp;
ASSERT_TRUE(
ReopenColumnFamiliesWithTs({"pikachu"}, ts_options).IsInvalidArgument());
@ -464,13 +486,13 @@ TEST_P(DBWALTestWithTimestamp, RecoverInconsistentTimestamp) {
TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) {
// Set up the option that enables user defined timestamp size.
std::string min_ts = Timestamp(0, 0);
std::string write_ts = Timestamp(1, 0);
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize);
std::string min_ts;
std::string write_ts;
PutFixed64(&min_ts, 0);
PutFixed64(&write_ts, 1);
Options ts_options;
ts_options.create_if_missing = true;
ts_options.comparator = &test_cmp;
ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
ts_options.persist_user_defined_timestamps = persist_udt_;
std::string smallest_ukey_without_ts = "baz";

@ -3289,15 +3289,18 @@ TEST_P(HandleFileBoundariesTest, ConfigurePersistUdt) {
options.env = env_;
// Write a timestamp that is not the min timestamp to help test the behavior
// of flag `persist_user_defined_timestamps`.
std::string write_ts = Timestamp(1, 0);
std::string min_ts = Timestamp(0, 0);
std::string write_ts;
std::string min_ts;
PutFixed64(&write_ts, 1);
PutFixed64(&min_ts, 0);
std::string smallest_ukey_without_ts = "bar";
std::string largest_ukey_without_ts = "foo";
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
bool persist_udt = test::ShouldPersistUDT(GetParam());
options.persist_user_defined_timestamps = persist_udt;
if (!persist_udt) {
options.allow_concurrent_memtable_write = false;
}
DestroyAndReopen(options);
ASSERT_OK(

@ -189,6 +189,10 @@ void FlushJob::PickMemTable() {
return;
}
// Track effective cutoff user-defined timestamp during flush if
// user-defined timestamps can be stripped.
GetEffectiveCutoffUDTForPickedMemTables();
ReportFlushInputSize(mems_);
// entries mems are (implicitly) sorted in ascending order by their created
@ -294,6 +298,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
s = Status::ShutdownInProgress("Database shutdown");
}
if (s.ok()) {
s = MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
}
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
} else if (write_manifest_) {
@ -1097,4 +1105,57 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
return info;
}
void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
db_mutex_->AssertHeld();
assert(pick_memtable_called);
const auto* ucmp = cfd_->internal_comparator().user_comparator();
assert(ucmp);
const size_t ts_sz = ucmp->timestamp_size();
if (db_options_.atomic_flush || ts_sz == 0 ||
cfd_->ioptions()->persist_user_defined_timestamps) {
return;
}
for (MemTable* m : mems_) {
Slice table_newest_udt = m->GetNewestUDT();
// The picked Memtables should have ascending ID, and should have
// non-decreasing newest user-defined timestamps.
if (!cutoff_udt_.empty()) {
assert(table_newest_udt.size() == cutoff_udt_.size());
assert(ucmp->CompareTimestamp(table_newest_udt, cutoff_udt_) >= 0);
cutoff_udt_.clear();
}
cutoff_udt_.assign(table_newest_udt.data(), table_newest_udt.size());
}
}
Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() {
db_mutex_->AssertHeld();
const auto* ucmp = cfd_->user_comparator();
assert(ucmp);
const std::string& full_history_ts_low = cfd_->GetFullHistoryTsLow();
// Update full_history_ts_low to right above cutoff udt only if that would
// increase it.
if (cutoff_udt_.empty() ||
(!full_history_ts_low.empty() &&
ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) {
return Status::OK();
}
Slice cutoff_udt_slice = cutoff_udt_;
uint64_t cutoff_udt_ts = 0;
bool format_res = GetFixed64(&cutoff_udt_slice, &cutoff_udt_ts);
assert(format_res);
(void)format_res;
std::string new_full_history_ts_low;
// TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an
// operation to get the next immediately larger user-defined timestamp to
// expand this feature to other user-defined timestamp formats.
PutFixed64(&new_full_history_ts_low, cutoff_udt_ts + 1);
VersionEdit edit;
edit.SetColumnFamily(cfd_->GetID());
edit.SetFullHistoryTsLow(new_full_history_ts_low);
return versions_->LogAndApply(cfd_, *cfd_->GetLatestMutableCFOptions(),
ReadOptions(), &edit, db_mutex_,
output_file_directory_);
}
} // namespace ROCKSDB_NAMESPACE

@ -127,6 +127,20 @@ class FlushJob {
Env::IOPriority GetRateLimiterPriorityForWrite();
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
// Require db_mutex held.
// Called only when UDT feature is enabled and
// `persist_user_defined_timestamps` flag is false. Because we will refrain
// from flushing as long as there are still UDTs in a memtable that hasn't
// expired w.r.t `full_history_ts_low`. However, flush is continued if there
// is risk of entering write stall mode. In that case, we need
// to track the effective cutoff timestamp below which all the udts are
// removed because of flush, and use it to increase `full_history_ts_low` if
// the effective cutoff timestamp is newer. See
// `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
void GetEffectiveCutoffUDTForPickedMemTables();
Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
const std::string& dbname_;
const std::string db_id_;
const std::string db_session_id_;
@ -195,6 +209,10 @@ class FlushJob {
// db mutex
const SeqnoToTimeMapping& db_impl_seqno_time_mapping_;
SeqnoToTimeMapping seqno_to_time_mapping_;
// Keeps track of the newest user-defined timestamp for this flush job if
// `persist_user_defined_timestamps` flag is false.
std::string cutoff_udt_;
};
} // namespace ROCKSDB_NAMESPACE

@ -654,6 +654,10 @@ class FlushJobTimestampTest
installed_file_meta->smallest.Encode());
ASSERT_EQ(expected_largest.Encode(), installed_file_meta->largest.Encode());
}
void CheckFullHistoryTsLow(ColumnFamilyData* cfd,
const std::string& expected_full_history_ts_low) {
ASSERT_EQ(expected_full_history_ts_low, cfd->GetFullHistoryTsLow());
}
};
TEST_P(FlushJobTimestampTest, AllKeysExpired) {
@ -684,6 +688,7 @@ TEST_P(FlushJobTimestampTest, AllKeysExpired) {
EventLogger event_logger(db_options_.info_log.get());
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
cfd->SetFullHistoryTsLow(full_history_ts_low);
FlushJob flush_job(
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
@ -714,6 +719,7 @@ TEST_P(FlushJobTimestampTest, AllKeysExpired) {
}
InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp);
CheckFileMetaData(cfd, ikey, ikey, &fmeta);
CheckFullHistoryTsLow(cfd, full_history_ts_low);
}
job_context.Clean();
@ -744,6 +750,7 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) {
EventLogger event_logger(db_options_.info_log.get());
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 0);
cfd->SetFullHistoryTsLow(full_history_ts_low);
FlushJob flush_job(
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
std::numeric_limits<uint64_t>::max() /* memtable_id */, env_options_,
@ -765,6 +772,7 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) {
std::string ukey = test::EncodeInt(0);
std::string smallest_key;
std::string largest_key;
std::string expected_full_history_ts_low;
if (!persist_udt_) {
// When `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` flag
// is set to false. The user-defined timestamp is stripped from user key
@ -772,14 +780,21 @@ TEST_P(FlushJobTimestampTest, NoKeyExpired) {
// timestamp, which is hardcoded to be all zeros for now.
smallest_key = ukey + test::EncodeInt(0);
largest_key = ukey + test::EncodeInt(0);
// When not all keys have expired and `persist_user_defined_timestamps` is
// false. UDTs will be removed during flush, `full_history_ts_low` should
// be automatically increased to above the effective cutoff UDT in the
// flush.
PutFixed64(&expected_full_history_ts_low, curr_ts_.fetch_add(1));
} else {
smallest_key =
ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1);
largest_key = ukey + test::EncodeInt(kStartTs);
expected_full_history_ts_low = full_history_ts_low;
}
InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
CheckFileMetaData(cfd, smallest, largest, &fmeta);
CheckFullHistoryTsLow(cfd, expected_full_history_ts_low);
}
job_context.Clean();
ASSERT_TRUE(to_delete.empty());

@ -143,6 +143,10 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
new_cache.get()),
std::memory_order_relaxed);
}
const Comparator* ucmp = cmp.user_comparator();
assert(ucmp);
ts_sz_ = ucmp->timestamp_size();
persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps;
}
MemTable::~MemTable() {
@ -357,7 +361,8 @@ class MemTableIterator : public InternalIterator {
!mem.GetImmutableMemTableOptions()->inplace_update_support),
protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key),
status_(Status::OK()),
logger_(mem.moptions_.info_log) {
logger_(mem.moptions_.info_log),
ts_sz_(mem.ts_sz_) {
if (use_range_del_table) {
iter_ = mem.range_del_table_->GetIterator(arena);
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
@ -400,8 +405,7 @@ class MemTableIterator : public InternalIterator {
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
if (bloom_) {
// iterator should only use prefix bloom filter
auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size();
Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz));
Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
if (prefix_extractor_->InDomain(user_k_without_ts)) {
if (!bloom_->MayContain(
prefix_extractor_->Transform(user_k_without_ts))) {
@ -421,8 +425,7 @@ class MemTableIterator : public InternalIterator {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
if (bloom_) {
auto ts_sz = comparator_.comparator.user_comparator()->timestamp_size();
Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz));
Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
if (prefix_extractor_->InDomain(user_k_without_ts)) {
if (!bloom_->MayContain(
prefix_extractor_->Transform(user_k_without_ts))) {
@ -512,6 +515,7 @@ class MemTableIterator : public InternalIterator {
uint32_t protection_bytes_per_key_;
Status status_;
Logger* logger_;
size_t ts_sz_;
void VerifyEntryChecksum() {
if (protection_bytes_per_key_ > 0 && Valid()) {
@ -625,8 +629,7 @@ Status MemTable::VerifyEncodedEntry(Slice encoded,
if (!GetVarint32(&encoded, &ikey_len)) {
return Status::Corruption("Unable to parse internal key length");
}
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
if (ikey_len < 8 + ts_sz) {
if (ikey_len < 8 + ts_sz_) {
return Status::Corruption("Internal key length too short");
}
if (ikey_len > encoded.size()) {
@ -725,8 +728,7 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
}
}
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz);
Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_);
if (!allow_concurrent) {
// Extract prefix for insert with hint.
@ -776,6 +778,9 @@ Status MemTable::Add(SequenceNumber s, ValueType type,
assert(first_seqno_.load() >= earliest_seqno_.load());
}
assert(post_process_info == nullptr);
// TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent`
// is true.
MaybeUpdateNewestUDT(key_slice);
UpdateFlushState();
} else {
bool res = (hint == nullptr)
@ -1286,8 +1291,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value,
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();
bool may_contain = true;
size_t ts_sz = GetInternalKeyComparator().user_comparator()->timestamp_size();
Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz);
Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_);
bool bloom_checked = false;
if (bloom_filter_) {
// when both memtable_whole_key_filtering and prefix_extractor_ are set,
@ -1672,4 +1676,22 @@ uint64_t MemTable::GetMinLogContainingPrepSection() {
return min_prep_log_referenced_.load();
}
void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
if (ts_sz_ == 0 || persist_user_defined_timestamps_) {
return;
}
const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_);
if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) {
newest_udt_ = udt;
}
}
const Slice& MemTable::GetNewestUDT() const {
// This path should not be invoked for MemTables that does not enable the UDT
// in Memtable only feature.
assert(ts_sz_ > 0 && !persist_user_defined_timestamps_);
return newest_udt_;
}
} // namespace ROCKSDB_NAMESPACE

@ -353,6 +353,10 @@ class MemTable {
return data_size_.load(std::memory_order_relaxed);
}
size_t write_buffer_size() const {
return write_buffer_size_.load(std::memory_order_relaxed);
}
// Dynamically change the memtable's capacity. If set below the current usage,
// the next key added will trigger a flush. Can only increase size when
// memtable prefix bloom is disabled, since we can't easily allocate more
@ -527,6 +531,14 @@ class MemTable {
}
}
// Get the newest user-defined timestamp contained in this MemTable. Check
// `newest_udt_` for what newer means. This method should only be invoked for
// an MemTable that has enabled user-defined timestamp feature and set
// `persist_user_defined_timestamps` to false. The tracked newest UDT will be
// used by flush job in the background to help check the MemTable's
// eligibility for Flush.
const Slice& GetNewestUDT() const;
// Returns Corruption status if verification fails.
static Status VerifyEntryChecksum(const char* entry,
uint32_t protection_bytes_per_key,
@ -617,6 +629,19 @@ class MemTable {
// Flush job info of the current memtable.
std::unique_ptr<FlushJobInfo> flush_job_info_;
// Size in bytes for the user-defined timestamps.
size_t ts_sz_;
// Whether to persist user-defined timestamps
bool persist_user_defined_timestamps_;
// Newest user-defined timestamp contained in this MemTable. For ts1, and ts2
// if Comparator::CompareTimestamp(ts1, ts2) > 0, ts1 is considered newer than
// ts2. We track this field for a MemTable if its column family has UDT
// feature enabled and the `persist_user_defined_timestamp` flag is false.
// Otherwise, this field just contains an empty Slice.
Slice newest_udt_;
// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();
@ -653,6 +678,8 @@ class MemTable {
void UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
const Slice& key, const Slice& value, ValueType type,
SequenceNumber s, char* checksum_ptr);
void MaybeUpdateNewestUDT(const Slice& user_key);
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

@ -382,6 +382,25 @@ class MemTableList {
return memlist.front()->GetID();
}
// DB mutex held.
// Gets the newest user-defined timestamp for the Memtables in ascending ID
// order, up to the `max_memtable_id`. Used by background flush job
// to check Memtables' eligibility for flush w.r.t retaining UDTs.
std::vector<Slice> GetTablesNewestUDT(uint64_t max_memtable_id) {
std::vector<Slice> newest_udts;
auto& memlist = current_->memlist_;
// Iterating through the memlist starting at the end, the vector<MemTable*>
// ret is filled with memtables already sorted in increasing MemTable ID.
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it;
if (m->GetID() > max_memtable_id) {
break;
}
newest_udts.push_back(m->GetNewestUDT());
}
return newest_udts;
}
void AssignAtomicFlushSeq(const SequenceNumber& seq) {
const auto& memlist = current_->memlist_;
// Scan the memtable list from new to old

@ -43,6 +43,9 @@ class MemTableListTest : public testing::Test {
// Open DB only with default column family
ColumnFamilyOptions cf_options;
std::vector<ColumnFamilyDescriptor> cf_descs;
if (udt_enabled_) {
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
}
cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
EXPECT_OK(s);
@ -200,6 +203,9 @@ class MemTableListTest : public testing::Test {
nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
}
protected:
bool udt_enabled_ = false;
};
TEST_F(MemTableListTest, Empty) {
@ -868,7 +874,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
to_delete.clear();
}
TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
TEST_F(MemTableListTest, EmptyAtomicFlushTest) {
autovector<MemTableList*> lists;
autovector<uint32_t> cf_ids;
autovector<const MutableCFOptions*> options_list;
@ -880,7 +886,7 @@ TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
ASSERT_TRUE(to_delete.empty());
}
TEST_F(MemTableListTest, AtomicFlusTest) {
TEST_F(MemTableListTest, AtomicFlushTest) {
const int num_cfs = 3;
const int num_tables_per_cf = 2;
SequenceNumber seq = 1;
@ -1028,6 +1034,86 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
}
}
class MemTableListWithTimestampTest : public MemTableListTest {
public:
MemTableListWithTimestampTest() : MemTableListTest() {}
void SetUp() override { udt_enabled_ = true; }
};
TEST_F(MemTableListWithTimestampTest, GetTableNewestUDT) {
const int num_tables = 3;
const int num_entries = 5;
SequenceNumber seq = 1;
auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
options.persist_user_defined_timestamps = false;
ImmutableOptions ioptions(options);
const Comparator* ucmp = test::BytewiseComparatorWithU64TsWrapper();
InternalKeyComparator cmp(ucmp);
WriteBufferManager wb(options.db_write_buffer_size);
// Create MemTableList
int min_write_buffer_number_to_merge = 1;
int max_write_buffer_number_to_maintain = 4;
int64_t max_write_buffer_size_to_maintain =
4 * static_cast<int>(options.write_buffer_size);
MemTableList list(min_write_buffer_number_to_merge,
max_write_buffer_number_to_maintain,
max_write_buffer_size_to_maintain);
// Create some MemTables
uint64_t memtable_id = 0;
std::vector<MemTable*> tables;
MutableCFOptions mutable_cf_options(options);
uint64_t current_ts = 0;
autovector<MemTable*> to_delete;
std::vector<std::string> newest_udts;
std::string key;
std::string write_ts;
for (int i = 0; i < num_tables; i++) {
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
kMaxSequenceNumber, 0 /* column_family_id */);
mem->SetID(memtable_id++);
mem->Ref();
std::string value;
MergeContext merge_context;
for (int j = 0; j < num_entries; j++) {
key = "key1";
write_ts.clear();
PutFixed64(&write_ts, current_ts);
key.append(write_ts);
ASSERT_OK(mem->Add(++seq, kTypeValue, key, std::to_string(i),
nullptr /* kv_prot_info */));
current_ts++;
}
tables.push_back(mem);
list.Add(tables.back(), &to_delete);
newest_udts.push_back(write_ts);
}
ASSERT_EQ(num_tables, list.NumNotFlushed());
ASSERT_TRUE(list.IsFlushPending());
std::vector<Slice> tables_newest_udts = list.GetTablesNewestUDT(num_tables);
ASSERT_EQ(newest_udts.size(), tables_newest_udts.size());
for (size_t i = 0; i < tables_newest_udts.size(); i++) {
const Slice& table_newest_udt = tables_newest_udts[i];
const Slice expected_newest_udt = newest_udts[i];
ASSERT_EQ(expected_newest_udt, table_newest_udt);
}
list.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
delete m;
}
to_delete.clear();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -365,12 +365,15 @@ TEST_P(RepairTestWithTimestamp, UnflushedSst) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
std::string min_ts = Timestamp(0, 0);
std::string write_ts = Timestamp(1, 0);
const size_t kTimestampSize = write_ts.size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
std::string min_ts;
std::string write_ts;
PutFixed64(&min_ts, 0);
PutFixed64(&write_ts, 1);
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
options.persist_user_defined_timestamps = persist_udt;
if (!persist_udt) {
options.allow_concurrent_memtable_write = false;
}
options.paranoid_file_checks = paranoid_file_checks;
ColumnFamilyOptions cf_options(options);

@ -1155,10 +1155,20 @@ struct AdvancedColumnFamilyOptions {
// while set this flag to be `false`: user keys in the newly generated SST
// files are of the same format as the existing SST files.
//
// Currently only user comparator that formats user-defined timesamps as
// uint64_t via using one of the RocksDB provided comparator
// `ComparatorWithU64TsImpl` are supported.
//
// When setting this flag to `false`, users should also call
// `DB::IncreaseFullHistoryTsLow` to set a cutoff timestamp for flush. RocksDB
// refrains from flushing a memtable with data still above
// the cutoff timestamp with best effort. Users can do user-defined
// the cutoff timestamp with best effort. If this cutoff timestamp is not set,
// flushing continues normally.
// NOTE: in order for the cutoff timestamp to work properly, users of this
// feature need to ensure to write to a column family with globally
// non-decreasing user-defined timestamps.
//
// Users can do user-defined
// multi-versioned read above the cutoff timestamp. When users try to read
// below the cutoff timestamp, an error will be returned.
//
@ -1169,6 +1179,10 @@ struct AdvancedColumnFamilyOptions {
// downgrade or toggling on / off the user-defined timestamp feature on a
// column family.
//
// Note that setting this flag to false is not supported in combination with
// atomic flush, or concurrent memtable write enabled by
// `allow_concurrent_memtable_write`.
//
// Default: true (user-defined timestamps are persisted)
// Not dynamically changeable, change it requires db restart and
// only compatible changes are allowed.

Loading…
Cancel
Save