Add full_history_ts_low to column family (#7740)

Summary:
Following https://github.com/facebook/rocksdb/issues/7655 and https://github.com/facebook/rocksdb/issues/7657, this PR adds `full_history_ts_low_` to `ColumnFamilyData`.
`ColumnFamilyData::full_history_ts_low_` will be used to create `FlushJob` and `CompactionJob`.

`ColumnFamilyData::full_history_ts_low` is persisted to the MANIFEST file. An application can only
increase its value. Consider the following case:

>
> The database has a key at ts=950. `full_history_ts_low` is first set to 1000, and then a GC is triggered
> and cleans up all data older than 1000. If the application sets `full_history_ts_low` to 900 afterwards,
> and tries to read at ts=960, the key at 950 is not seen. From the perspective of the read, the result
> is hard to reason. For simplicity, we just do now allow decreasing full_history_ts_low for now.
>

During recovery, the value of `full_history_ts_low` is restored for each column family if applicable. Note that
version edits in the MANIFEST file for the same column family may have `full_history_ts_low` unsorted due
to the potential interleaving of `LogAndApply` calls. Only the max will be used to restore the state of the
column family.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7740

Test Plan: make check

Reviewed By: ltamasi

Differential Revision: D25296217

Pulled By: riversand963

fbshipit-source-id: 24acda1df8262cd7cfdc6ce7b0ec56438abe242a
main
Yanqin Jin 4 years ago committed by Facebook GitHub Bot
parent e34b2e9f41
commit eee0af9af1
  1. 17
      db/column_family.h
  2. 10
      db/db_impl/db_impl_compaction_flush.cc
  3. 24
      db/version_edit.cc
  4. 13
      db/version_edit.h
  5. 4
      db/version_edit_handler.cc
  6. 10
      db/version_edit_test.cc
  7. 26
      db/version_set.cc
  8. 5
      db/version_set.h
  9. 120
      db/version_set_test.cc

@ -509,6 +509,21 @@ class ColumnFamilyData {
FSDirectory* GetDataDir(size_t path_id) const;
// full_history_ts_low_ can only increase.
void SetFullHistoryTsLow(std::string ts_low) {
assert(!ts_low.empty());
const Comparator* ucmp = user_comparator();
assert(ucmp);
if (full_history_ts_low_.empty() ||
ucmp->CompareTimestamp(ts_low, full_history_ts_low_) > 0) {
full_history_ts_low_ = std::move(ts_low);
}
}
const std::string& GetFullHistoryTsLow() const {
return full_history_ts_low_;
}
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
private:
@ -605,6 +620,8 @@ class ColumnFamilyData {
std::vector<std::shared_ptr<FSDirectory>> data_dirs_;
bool db_paths_registered_;
std::string full_history_ts_low_;
};
// ColumnFamilySet has interesting thread-safety requirements

@ -166,7 +166,7 @@ Status DBImpl::FlushMemTableToOutputFile(
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
io_tracer_, db_id_, db_session_id_);
io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow());
FileMetaData file_meta;
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
@ -407,7 +407,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */,
thread_pri, io_tracer_, db_id_, db_session_id_));
thread_pri, io_tracer_, db_id_, db_session_id_,
cfd->GetFullHistoryTsLow()));
jobs.back()->PickMemTable();
}
@ -1200,7 +1201,8 @@ Status DBImpl::CompactFilesImpl(
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, Env::Priority::USER, io_tracer_,
&manual_compaction_paused_, db_id_, db_session_id_);
&manual_compaction_paused_, db_id_, db_session_id_,
c->column_family_data()->GetFullHistoryTsLow());
// Creating a compaction influences the compaction score because the score
// takes running compactions into account (by skipping files that are already
@ -3054,7 +3056,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats, thread_pri, io_tracer_,
is_manual ? &manual_compaction_paused_ : nullptr, db_id_,
db_session_id_);
db_session_id_, c->column_family_data()->GetFullHistoryTsLow());
compaction_job.Prepare();
NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,

@ -96,6 +96,7 @@ void VersionEdit::Clear() {
column_family_name_.clear();
is_in_atomic_group_ = false;
remaining_entries_ = 0;
full_history_ts_low_.clear();
}
bool VersionEdit::EncodeTo(std::string* dst) const {
@ -252,6 +253,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kInAtomicGroup);
PutVarint32(dst, remaining_entries_);
}
if (HasFullHistoryTsLow()) {
PutVarint32(dst, kFullHistoryTsLow);
PutLengthPrefixedSlice(dst, full_history_ts_low_);
}
return true;
}
@ -612,6 +618,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kFullHistoryTsLow:
if (!GetLengthPrefixedSlice(&input, &str)) {
msg = "full_history_ts_low";
} else if (str.empty()) {
msg = "full_history_ts_low: empty";
} else {
full_history_ts_low_.assign(str.data(), str.size());
}
break;
default:
if (tag & kTagSafeIgnoreMask) {
// Tag from future which can be safely ignored.
@ -744,6 +760,10 @@ std::string VersionEdit::DebugString(bool hex_key) const {
AppendNumberTo(&r, remaining_entries_);
r.append(" entries remains");
}
if (HasFullHistoryTsLow()) {
r.append("\n FullHistoryTsLow: ");
r.append(Slice(full_history_ts_low_).ToString(hex_key));
}
r.append("\n}\n");
return r;
}
@ -873,6 +893,10 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
jw << "AtomicGroup" << remaining_entries_;
}
if (HasFullHistoryTsLow()) {
jw << "FullHistoryTsLow" << Slice(full_history_ts_low_).ToString(hex_key);
}
jw.EndObject();
return jw.Get();

@ -61,6 +61,7 @@ enum Tag : uint32_t {
kBlobFileGarbage,
kWalAddition,
kWalDeletion,
kFullHistoryTsLow,
};
enum NewFileCustomTag : uint32_t {
@ -524,6 +525,16 @@ class VersionEdit {
bool IsInAtomicGroup() const { return is_in_atomic_group_; }
uint32_t GetRemainingEntries() const { return remaining_entries_; }
bool HasFullHistoryTsLow() const { return !full_history_ts_low_.empty(); }
const std::string& GetFullHistoryTsLow() const {
assert(HasFullHistoryTsLow());
return full_history_ts_low_;
}
void SetFullHistoryTsLow(std::string full_history_ts_low) {
assert(!full_history_ts_low.empty());
full_history_ts_low_ = std::move(full_history_ts_low);
}
// return true on success.
bool EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
@ -586,6 +597,8 @@ class VersionEdit {
bool is_in_atomic_group_ = false;
uint32_t remaining_entries_ = 0;
std::string full_history_ts_low_;
};
} // namespace ROCKSDB_NAMESPACE

@ -520,6 +520,10 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
}
if (edit.HasFullHistoryTsLow()) {
const std::string& new_ts = edit.GetFullHistoryTsLow();
cfd->SetFullHistoryTsLow(new_ts);
}
}
if (s.ok()) {

@ -8,8 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_edit.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/coding.h"
#include "util/string_util.h"
@ -505,6 +507,14 @@ TEST_F(VersionEditTest, DeleteWalDebug) {
ASSERT_EQ(edit.DebugJSON(4, true), expected_json);
}
TEST_F(VersionEditTest, FullHistoryTsLow) {
VersionEdit edit;
ASSERT_FALSE(edit.HasFullHistoryTsLow());
std::string ts = test::EncodeInt(0);
edit.SetFullHistoryTsLow(ts);
TestEncodeDecode(edit);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -4044,7 +4044,9 @@ Status VersionSet::ProcessManifestWrites(
}
for (const auto* cfd : *column_family_set_) {
assert(curr_state.find(cfd->GetID()) == curr_state.end());
curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
curr_state.emplace(std::make_pair(
cfd->GetID(),
MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow())));
}
for (const auto& wal : wals_.GetWals()) {
@ -4227,13 +4229,21 @@ Status VersionSet::ProcessManifestWrites(
// Each version in versions corresponds to a column family.
// For each column family, update its log number indicating that logs
// with number smaller than this should be ignored.
// TODO (yanqin): remove the nested loop if possible.
for (const auto version : versions) {
uint64_t max_log_number_in_batch = 0;
assert(version->cfd_);
uint32_t cf_id = version->cfd_->GetID();
std::string full_history_ts_low;
for (const auto& e : batch_edits) {
if (e->has_log_number_ && e->column_family_ == cf_id) {
max_log_number_in_batch =
std::max(max_log_number_in_batch, e->log_number_);
if (e->column_family_ == cf_id) {
if (e->has_log_number_) {
max_log_number_in_batch =
std::max(max_log_number_in_batch, e->log_number_);
}
if (e->HasFullHistoryTsLow()) {
version->cfd_->SetFullHistoryTsLow(e->GetFullHistoryTsLow());
}
}
}
if (max_log_number_in_batch != 0) {
@ -4589,6 +4599,10 @@ Status VersionSet::ExtractInfoFromVersionEdit(
cfd->user_comparator()->Name(),
"does not match existing comparator " + from_edit.comparator_);
}
if (from_edit.HasFullHistoryTsLow()) {
const std::string& new_ts = from_edit.GetFullHistoryTsLow();
cfd->SetFullHistoryTsLow(new_ts);
}
}
if (from_edit.has_prev_log_number_) {
@ -5279,6 +5293,10 @@ Status VersionSet::WriteCurrentStateToManifest(
assert(iter != curr_state.end());
uint64_t log_number = iter->second.log_number;
edit.SetLogNumber(log_number);
const std::string& full_history_ts_low = iter->second.full_history_ts_low;
if (!full_history_ts_low.empty()) {
edit.SetFullHistoryTsLow(full_history_ts_low);
}
std::string record;
if (!edit.EncodeTo(&record)) {
return Status::Corruption(

@ -1278,6 +1278,11 @@ class VersionSet {
struct MutableCFState {
uint64_t log_number;
std::string full_history_ts_low;
explicit MutableCFState() = default;
explicit MutableCFState(uint64_t _log_number, std::string ts_low)
: log_number(_log_number), full_history_ts_low(std::move(ts_low)) {}
};
// Save current contents to *log

@ -869,6 +869,28 @@ class VersionSetTestBase {
mutex_.Unlock();
}
ColumnFamilyData* CreateColumnFamily(const std::string& cf_name,
const ColumnFamilyOptions& cf_options) {
VersionEdit new_cf;
new_cf.AddColumnFamily(cf_name);
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
new_cf.SetColumnFamily(new_id);
new_cf.SetLogNumber(0);
new_cf.SetComparatorName(cf_options.comparator->Name());
Status s;
mutex_.Lock();
s = versions_->LogAndApply(/*column_family_data=*/nullptr,
MutableCFOptions(cf_options), &new_cf, &mutex_,
/*db_directory=*/nullptr,
/*new_descriptor_log=*/false, &cf_options);
mutex_.Unlock();
EXPECT_OK(s);
ColumnFamilyData* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(cf_name);
EXPECT_NE(nullptr, cfd);
return cfd;
}
Env* mem_env_;
Env* env_;
std::shared_ptr<Env> env_guard_;
@ -1667,6 +1689,104 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) {
}
}
class VersionSetWithTimestampTest : public VersionSetTest {
public:
static const std::string kNewCfName;
explicit VersionSetWithTimestampTest() : VersionSetTest() {}
void SetUp() override {
NewDB();
Options options;
options.comparator = test::ComparatorWithU64Ts();
cfd_ = CreateColumnFamily(kNewCfName, options);
EXPECT_NE(nullptr, cfd_);
EXPECT_NE(nullptr, cfd_->GetLatestMutableCFOptions());
column_families_.emplace_back(kNewCfName, options);
}
void TearDown() override {
for (auto* e : edits_) {
delete e;
}
edits_.clear();
}
void GenVersionEditsToSetFullHistoryTsLow(
const std::vector<uint64_t>& ts_lbs) {
for (const auto ts_lb : ts_lbs) {
VersionEdit* edit = new VersionEdit;
edit->SetColumnFamily(cfd_->GetID());
std::string ts_str = test::EncodeInt(ts_lb);
edit->SetFullHistoryTsLow(ts_str);
edits_.emplace_back(edit);
}
}
void VerifyFullHistoryTsLow(uint64_t expected_ts_low) {
std::unique_ptr<VersionSet> vset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
&write_buffer_manager_, &write_controller_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false,
/*db_id=*/nullptr));
for (auto* cfd : *(vset->GetColumnFamilySet())) {
ASSERT_NE(nullptr, cfd);
if (cfd->GetName() == kNewCfName) {
ASSERT_EQ(test::EncodeInt(expected_ts_low), cfd->GetFullHistoryTsLow());
} else {
ASSERT_TRUE(cfd->GetFullHistoryTsLow().empty());
}
}
}
void DoTest(const std::vector<uint64_t>& ts_lbs) {
if (ts_lbs.empty()) {
return;
}
GenVersionEditsToSetFullHistoryTsLow(ts_lbs);
Status s;
mutex_.Lock();
s = versions_->LogAndApply(cfd_, *(cfd_->GetLatestMutableCFOptions()),
edits_, &mutex_);
mutex_.Unlock();
ASSERT_OK(s);
VerifyFullHistoryTsLow(*std::max_element(ts_lbs.begin(), ts_lbs.end()));
}
protected:
ColumnFamilyData* cfd_{nullptr};
// edits_ must contain and own pointers to heap-alloc VersionEdit objects.
autovector<VersionEdit*> edits_;
};
const std::string VersionSetWithTimestampTest::kNewCfName("new_cf");
TEST_F(VersionSetWithTimestampTest, SetFullHistoryTsLbOnce) {
constexpr uint64_t kTsLow = 100;
DoTest({kTsLow});
}
// Simulate the application increasing full_history_ts_low.
TEST_F(VersionSetWithTimestampTest, IncreaseFullHistoryTsLb) {
const std::vector<uint64_t> ts_lbs = {100, 101, 102, 103};
DoTest(ts_lbs);
}
// Simulate the application trying to decrease full_history_ts_low
// unsuccessfully. If the application calls public API sequentially to
// decrease the lower bound ts, RocksDB will return an InvalidArgument
// status before involving VersionSet. Only when multiple threads trying
// to decrease the lower bound concurrently will this case ever happen. Even
// so, the lower bound cannot be decreased. The application will be notified
// via return value of the API.
TEST_F(VersionSetWithTimestampTest, TryDecreaseFullHistoryTsLb) {
const std::vector<uint64_t> ts_lbs = {103, 102, 101, 100};
DoTest(ts_lbs);
}
class VersionSetAtomicGroupTest : public VersionSetTestBase,
public testing::Test {
public:

Loading…
Cancel
Save