diff --git a/db/builder.cc b/db/builder.cc index 543f957b5..9a78e0f4f 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -69,8 +69,8 @@ TableBuilder* NewTableBuilder( } Status BuildTable( - const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs, - const ImmutableCFOptions& ioptions, + const std::string& dbname, VersionSet* versions, + const ImmutableDBOptions& db_options, const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, const FileOptions& file_options, TableCache* table_cache, InternalIterator* iter, std::vector> @@ -91,7 +91,7 @@ Status BuildTable( TableProperties* table_properties, int level, const uint64_t creation_time, const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint, const uint64_t file_creation_time, const std::string& db_id, - const std::string& db_session_id) { + const std::string& db_session_id, const std::string* full_history_ts_low) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -120,6 +120,10 @@ Status BuildTable( EventHelpers::NotifyTableFileCreationStarted( ioptions.listeners, dbname, column_family_name, fname, job_id, reason); #endif // !ROCKSDB_LITE + Env* env = db_options.env; + assert(env); + FileSystem* fs = db_options.fs.get(); + assert(fs); TableProperties tp; if (iter->Valid() || !range_del_agg->IsEmpty()) { TableBuilder* builder; @@ -180,7 +184,11 @@ Status BuildTable( &snapshots, earliest_write_conflict_snapshot, snapshot_checker, env, ShouldReportDetailedTime(env, ioptions.statistics), true /* internal key corruption is not ok */, range_del_agg.get(), - blob_file_builder.get(), ioptions.allow_data_in_errors); + blob_file_builder.get(), ioptions.allow_data_in_errors, + /*compaction=*/nullptr, + /*compaction_filter=*/nullptr, /*shutting_down=*/nullptr, + /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr, + db_options.info_log, full_history_ts_low); c_iter.SeekToFirst(); for (; c_iter.Valid(); c_iter.Next()) { diff --git a/db/builder.h b/db/builder.h index 8c80c6379..990b10f3a 100644 --- a/db/builder.h +++ b/db/builder.h @@ -65,8 +65,8 @@ TableBuilder* NewTableBuilder( // @param column_family_name Name of the column family that is also identified // by column_family_id, or empty string if unknown. extern Status BuildTable( - const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs, - const ImmutableCFOptions& options, + const std::string& dbname, VersionSet* versions, + const ImmutableDBOptions& db_options, const ImmutableCFOptions& options, const MutableCFOptions& mutable_cf_options, const FileOptions& file_options, TableCache* table_cache, InternalIterator* iter, std::vector> @@ -89,6 +89,7 @@ extern Status BuildTable( const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET, const uint64_t file_creation_time = 0, const std::string& db_id = "", - const std::string& db_session_id = ""); + const std::string& db_session_id = "", + const std::string* full_history_ts_low = nullptr); } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 43a26da9c..9d82a0a1e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1347,7 +1347,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, IOStatus io_s; s = BuildTable( - dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(), + dbname_, versions_.get(), immutable_db_options_, *cfd->ioptions(), mutable_cf_options, file_options_for_compaction_, cfd->table_cache(), iter.get(), std::move(range_del_iters), &meta, &blob_file_additions, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), diff --git a/db/flush_job.cc b/db/flush_job.cc index 504a232b6..6b943a567 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -94,7 +94,8 @@ FlushJob::FlushJob( Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::string& db_id, const std::string& db_session_id) + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), @@ -123,7 +124,8 @@ FlushJob::FlushJob( base_(nullptr), pick_memtable_called(false), thread_pri_(thread_pri), - io_tracer_(io_tracer) { + io_tracer_(io_tracer), + full_history_ts_low_(std::move(full_history_ts_low)) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -398,13 +400,14 @@ Status FlushJob::WriteLevel0Table() { : meta_.oldest_ancester_time; IOStatus io_s; + const std::string* const full_history_ts_low = + (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_; s = BuildTable( - dbname_, versions_, db_options_.env, db_options_.fs.get(), - *cfd_->ioptions(), mutable_cf_options_, file_options_, - cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, - &blob_file_additions, cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - cfd_->GetName(), existing_snapshots_, + dbname_, versions_, db_options_, *cfd_->ioptions(), + mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(), + std::move(range_del_iters), &meta_, &blob_file_additions, + cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), + cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, snapshot_checker_, output_compression_, mutable_cf_options_.sample_for_compression, mutable_cf_options_.compression_opts, @@ -412,7 +415,7 @@ Status FlushJob::WriteLevel0Table() { TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, creation_time, oldest_key_time, write_hint, current_time, db_id_, - db_session_id_); + db_session_id_, full_history_ts_low); if (!io_s.ok()) { io_status_ = io_s; } diff --git a/db/flush_job.h b/db/flush_job.h index b724b2464..785cfc9bc 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -73,8 +73,8 @@ class FlushJob { EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::string& db_id = "", - const std::string& db_session_id = ""); + const std::string& db_id = "", const std::string& db_session_id = "", + std::string full_history_ts_low = ""); ~FlushJob(); @@ -164,6 +164,8 @@ class FlushJob { IOStatus io_status_; const std::shared_ptr io_tracer_; + + const std::string full_history_ts_low_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 618594b2d..6b879a300 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -28,49 +28,33 @@ namespace ROCKSDB_NAMESPACE { // TODO(icanadi) Mock out everything else: // 1. VersionSet // 2. Memtable -class FlushJobTest : public testing::Test { - public: - FlushJobTest() +class FlushJobTestBase : public testing::Test { + protected: + FlushJobTestBase(std::string dbname, const Comparator* ucmp) : env_(Env::Default()), fs_(std::make_shared(env_)), - dbname_(test::PerThreadDBPath("flush_job_test")), + dbname_(std::move(dbname)), + ucmp_(ucmp), options_(), db_options_(options_), column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}), table_cache_(NewLRUCache(50000, 16)), write_buffer_manager_(db_options_.db_write_buffer_size), shutting_down_(false), - mock_table_factory_(new mock::MockTableFactory()) { - EXPECT_OK(env_->CreateDirIfMissing(dbname_)); - db_options_.db_paths.emplace_back(dbname_, - std::numeric_limits::max()); - db_options_.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); - // TODO(icanadi) Remove this once we mock out VersionSet - NewDB(); - std::vector column_families; - cf_options_.table_factory = mock_table_factory_; - for (const auto& cf_name : column_family_names_) { - column_families.emplace_back(cf_name, cf_options_); - } + mock_table_factory_(new mock::MockTableFactory()) {} - db_options_.env = env_; - db_options_.fs = fs_; - versions_.reset( - new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), - &write_buffer_manager_, &write_controller_, - /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); - EXPECT_OK(versions_->Recover(column_families, false)); + virtual ~FlushJobTestBase() { + if (getenv("KEEP_DB")) { + fprintf(stdout, "db is still in %s\n", dbname_.c_str()); + } else { + EXPECT_OK(DestroyDir(env_, dbname_)); + } } void NewDB() { SetIdentityFile(env_, dbname_); VersionEdit new_db; - if (db_options_.write_dbid_to_manifest) { - DBImpl* impl = new DBImpl(DBOptions(), dbname_); - std::string db_id; - impl->GetDbIdentityFromIdentityFile(&db_id); - new_db.SetDBId(db_id); - } + new_db.SetLogNumber(0); new_db.SetNextFile(2); new_db.SetLastSequence(0); @@ -82,6 +66,7 @@ class FlushJobTest : public testing::Test { VersionEdit new_cf; new_cf.AddColumnFamily(column_family_names_[i]); new_cf.SetColumnFamily(cf_id++); + new_cf.SetComparatorName(ucmp_->Name()); new_cf.SetLogNumber(0); new_cf.SetNextFile(2); new_cf.SetLastSequence(last_seq++); @@ -114,9 +99,37 @@ class FlushJobTest : public testing::Test { ASSERT_OK(s); } + void SetUp() override { + EXPECT_OK(env_->CreateDirIfMissing(dbname_)); + + // TODO(icanadi) Remove this once we mock out VersionSet + NewDB(); + + db_options_.env = env_; + db_options_.fs = fs_; + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits::max()); + db_options_.statistics = CreateDBStatistics(); + + cf_options_.comparator = ucmp_; + + std::vector column_families; + cf_options_.table_factory = mock_table_factory_; + for (const auto& cf_name : column_family_names_) { + column_families.emplace_back(cf_name, cf_options_); + } + + versions_.reset( + new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), + &write_buffer_manager_, &write_controller_, + /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr)); + EXPECT_OK(versions_->Recover(column_families, false)); + } + Env* env_; std::shared_ptr fs_; std::string dbname_; + const Comparator* const ucmp_; EnvOptions env_options_; Options options_; ImmutableDBOptions db_options_; @@ -131,6 +144,13 @@ class FlushJobTest : public testing::Test { std::shared_ptr mock_table_factory_; }; +class FlushJobTest : public FlushJobTestBase { + public: + FlushJobTest() + : FlushJobTestBase(test::PerThreadDBPath("flush_job_test"), + BytewiseComparator()) {} +}; + TEST_F(FlushJobTest, Empty) { JobContext job_context(0); auto cfd = versions_->GetColumnFamilySet()->GetDefault(); @@ -487,6 +507,135 @@ TEST_F(FlushJobTest, Snapshots) { job_context.Clean(); } +class FlushJobTimestampTest : public FlushJobTestBase { + public: + FlushJobTimestampTest() + : FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"), + test::ComparatorWithU64Ts()) {} + + void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts, + SequenceNumber seq, ValueType value_type, + Slice value) { + std::string key_str(std::move(key)); + PutFixed64(&key_str, ts); + memtable->Add(seq, value_type, key_str, value); + } + + protected: + static constexpr uint64_t kStartTs = 10; + static constexpr SequenceNumber kStartSeq = 0; + SequenceNumber curr_seq_{kStartSeq}; + std::atomic curr_ts_{kStartTs}; +}; + +TEST_F(FlushJobTimestampTest, AllKeysExpired) { + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + autovector to_delete; + + { + MemTable* new_mem = cfd->ConstructNewMemtable( + *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); + new_mem->Ref(); + for (int i = 0; i < 100; ++i) { + uint64_t ts = curr_ts_.fetch_add(1); + SequenceNumber seq = (curr_seq_++); + AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, + ValueType::kTypeValue, "0_value"); + } + uint64_t ts = curr_ts_.fetch_add(1); + SequenceNumber seq = (curr_seq_++); + AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, + ValueType::kTypeDeletionWithTimestamp, ""); + cfd->imm()->Add(new_mem, &to_delete); + } + + std::vector snapshots; + constexpr SnapshotChecker* const snapshot_checker = nullptr; + JobContext job_context(0); + EventLogger event_logger(db_options_.info_log.get()); + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, std::numeric_limits::max()); + FlushJob flush_job( + dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), + nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, + &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, + &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"", + /*db_session_id=*/"", full_history_ts_low); + + FileMetaData fmeta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); + mutex_.Unlock(); + + { + std::string key = test::EncodeInt(0); + key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1)); + InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp); + ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode()); + ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode()); + } + + job_context.Clean(); + ASSERT_TRUE(to_delete.empty()); +} + +TEST_F(FlushJobTimestampTest, NoKeyExpired) { + ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault(); + autovector to_delete; + + { + MemTable* new_mem = cfd->ConstructNewMemtable( + *cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber); + new_mem->Ref(); + for (int i = 0; i < 100; ++i) { + uint64_t ts = curr_ts_.fetch_add(1); + SequenceNumber seq = (curr_seq_++); + AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq, + ValueType::kTypeValue, "0_value"); + } + cfd->imm()->Add(new_mem, &to_delete); + } + + std::vector snapshots; + SnapshotChecker* const snapshot_checker = nullptr; + JobContext job_context(0); + EventLogger event_logger(db_options_.info_log.get()); + std::string full_history_ts_low; + PutFixed64(&full_history_ts_low, 0); + FlushJob flush_job( + dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), + nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_, + &shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker, + &job_context, nullptr, nullptr, nullptr, kNoCompression, + db_options_.statistics.get(), &event_logger, true, + true /* sync_output_directory */, true /* write_manifest */, + Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"", + /*db_session_id=*/"", full_history_ts_low); + + FileMetaData fmeta; + mutex_.Lock(); + flush_job.PickMemTable(); + ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta)); + mutex_.Unlock(); + + { + std::string ukey = test::EncodeInt(0); + std::string smallest_key = + ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1); + std::string largest_key = ukey + test::EncodeInt(kStartTs); + InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue); + InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue); + ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode()); + ASSERT_EQ(largest.Encode(), fmeta.largest.Encode()); + } + job_context.Clean(); + ASSERT_TRUE(to_delete.empty()); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/repair.cc b/db/repair.cc index 52399ab25..342d9b63e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -442,9 +442,9 @@ class Repairer { LegacyFileSystemWrapper fs(env_); IOStatus io_s; status = BuildTable( - dbname_, /* versions */ nullptr, env_, &fs, *cfd->ioptions(), - *cfd->GetLatestMutableCFOptions(), env_options_, table_cache_.get(), - iter.get(), std::move(range_del_iters), &meta, + dbname_, /* versions */ nullptr, immutable_db_options_, + *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_, + table_cache_.get(), iter.get(), std::move(range_del_iters), &meta, nullptr /* blob_file_additions */, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, snapshot_checker, kNoCompression,