Remove earlier partial BlobDB GC implementation (#6278)

Summary:
In addition to removing the earlier partially implemented garbage collection
logic from the BlobDB codebase, the patch also removes the test cases (as well as
the related sync points, as appropriate) that were only relevant for the old
implementation, and reworks the remaining ones so they use the new GC logic.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6278

Test Plan: `make check`

Differential Revision: D19335226

Pulled By: ltamasi

fbshipit-source-id: 0cc1794bc9892feda1426ed5522a318f3cb1b692
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent 76c117b24b
commit 1dd7873e08
  1. 3
      file/sst_file_manager_impl.cc
  2. 3
      utilities/blob_db/blob_compaction_filter.cc
  3. 2
      utilities/blob_db/blob_db.h
  4. 337
      utilities/blob_db/blob_db_impl.cc
  5. 43
      utilities/blob_db/blob_db_impl.h
  6. 2
      utilities/blob_db/blob_db_iterator.h
  7. 416
      utilities/blob_db/blob_db_test.cc
  8. 6
      utilities/blob_db/blob_file.h

@ -418,7 +418,8 @@ bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
Status SstFileManagerImpl::ScheduleFileDeletion(
const std::string& file_path, const std::string& path_to_sync,
const bool force_bg) {
TEST_SYNC_POINT("SstFileManagerImpl::ScheduleFileDeletion");
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
const_cast<std::string*>(&file_path));
return delete_scheduler_.DeleteFile(file_path, path_to_sync,
force_bg);
}

@ -32,8 +32,7 @@ CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
if (!blob_index.IsInlined() &&
blob_index.file_number() < context_.next_file_number &&
context_.current_blob_files.count(blob_index.file_number()) == 0) {
// Corresponding blob file gone. Could have been garbage collected or
// evicted by FIFO eviction.
// Corresponding blob file gone (most likely, evicted by FIFO eviction).
evicted_count_++;
evicted_size_ += key.size() + value.size();
return Decision::kRemove;

@ -20,8 +20,6 @@ namespace blob_db {
// A wrapped database which puts values of KV pairs in a separate log
// and store location to the log in the underlying DB.
// It lacks lots of importatant functionalities, e.g. DB restarts,
// garbage collection, iterators, etc.
//
// The factory needs to be moved to include/rocksdb/utilities to allow
// users to use blob DB.

@ -940,7 +940,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) {
StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_PUT);
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
Status s;
WriteBatch batch;
{
@ -953,7 +952,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
if (s.ok()) {
s = db_->Write(options, &batch);
}
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
return s;
}
@ -1531,8 +1529,6 @@ Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
get_impl_options.value = &index_entry;
get_impl_options.is_blob_index = &is_blob_index;
s = db_impl_->GetImpl(ro, key, get_impl_options);
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
if (expiration != nullptr) {
*expiration = kNoExpiration;
}
@ -1836,321 +1832,6 @@ std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
return std::make_pair(true, -1);
}
// Write callback for garbage collection to check if key has been updated
// since last read. Similar to how OptimisticTransaction works. See inline
// comment in GCFileAndUpdateLSM().
class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
public:
GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key,
SequenceNumber upper_bound)
: cfd_(cfd), key_(key), upper_bound_(upper_bound) {}
Status Callback(DB* db) override {
auto* db_impl = static_cast_with_check<DBImpl, DB>(db);
auto* sv = db_impl->GetAndRefSuperVersion(cfd_);
SequenceNumber latest_seq = 0;
bool found_record_for_key = false;
bool is_blob_index = false;
Status s = db_impl->GetLatestSequenceForKey(
sv, key_, false /*cache_only*/, 0 /*lower_bound_seq*/, &latest_seq,
&found_record_for_key, &is_blob_index);
db_impl->ReturnAndCleanupSuperVersion(cfd_, sv);
if (!s.ok() && !s.IsNotFound()) {
// Error.
assert(!s.IsBusy());
return s;
}
if (s.IsNotFound()) {
assert(!found_record_for_key);
return Status::Busy("Key deleted");
}
assert(found_record_for_key);
assert(is_blob_index);
if (latest_seq > upper_bound_) {
return Status::Busy("Key overwritten");
}
return s;
}
bool AllowWriteBatching() override { return false; }
private:
ColumnFamilyData* cfd_;
// Key to check
Slice key_;
// Upper bound of sequence number to proceed.
SequenceNumber upper_bound_;
};
// iterate over the blobs sequentially and check if the blob sequence number
// is the latest. If it is the latest, preserve it, otherwise delete it
// if it is TTL based, and the TTL has expired, then
// we can blow the entity if the key is still the latest or the Key is not
// found
// WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob
// without doing anything if the earliest snapshot is not
// referring to that sequence number, i.e. it is later than the sequence number
// of the new key
//
// if it is not TTL based, then we can blow the key if the key has been
// DELETED in the LSM
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gc_stats) {
StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS);
uint64_t now = EpochNow();
std::shared_ptr<Reader> reader =
bfptr->OpenRandomAccessReader(env_, db_options_, env_options_);
if (!reader) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File sequential reader could not be opened for %s",
bfptr->PathName().c_str());
return Status::IOError("failed to create sequential reader");
}
BlobLogHeader header;
Status s = reader->ReadHeader(&header);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failure to read header for blob-file %s",
bfptr->PathName().c_str());
return s;
}
auto cfh = db_impl_->DefaultColumnFamily();
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto column_family_id = cfd->GetID();
bool has_ttl = header.has_ttl;
// this reads the key but skips the blob
Reader::ReadLevel shallow = Reader::kReadHeaderKey;
ExpirationRange expiration_range;
{
ReadLock file_lock(&bfptr->mutex_);
expiration_range = bfptr->GetExpirationRange();
}
bool file_expired = has_ttl && now >= expiration_range.second;
if (!file_expired) {
// read the blob because you have to write it back to new file
shallow = Reader::kReadHeaderKeyBlob;
}
BlobLogRecord record;
std::shared_ptr<BlobFile> newfile;
std::shared_ptr<Writer> new_writer;
uint64_t blob_offset = 0;
while (true) {
assert(s.ok());
// Read the next blob record.
Status read_record_status =
reader->ReadRecord(&record, shallow, &blob_offset);
// Exit if we reach the end of blob file.
// TODO(yiwu): properly handle ReadRecord error.
if (!read_record_status.ok()) {
break;
}
gc_stats->blob_count++;
// Similar to OptimisticTransaction, we obtain latest_seq from
// base DB, which is guaranteed to be no smaller than the sequence of
// current key. We use a WriteCallback on write to check the key sequence
// on write. If the key sequence is larger than latest_seq, we know
// a new versions is inserted and the old blob can be disgard.
//
// We cannot use OptimisticTransaction because we need to pass
// is_blob_index flag to GetImpl.
SequenceNumber latest_seq = GetLatestSequenceNumber();
bool is_blob_index = false;
PinnableSlice index_entry;
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cfh;
get_impl_options.value = &index_entry;
get_impl_options.is_blob_index = &is_blob_index;
Status get_status =
db_impl_->GetImpl(ReadOptions(), record.key, get_impl_options);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
if (!get_status.ok() && !get_status.IsNotFound()) {
// error
s = get_status;
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while getting index entry: %s",
s.ToString().c_str());
break;
}
if (get_status.IsNotFound() || !is_blob_index) {
// Either the key is deleted or updated with a newer version whish is
// inlined in LSM.
gc_stats->num_keys_overwritten++;
gc_stats->bytes_overwritten += record.record_size();
continue;
}
BlobIndex blob_index;
s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while decoding index entry: %s",
s.ToString().c_str());
break;
}
if (blob_index.IsInlined() ||
blob_index.file_number() != bfptr->BlobFileNumber() ||
blob_index.offset() != blob_offset) {
// Key has been overwritten. Drop the blob record.
gc_stats->num_keys_overwritten++;
gc_stats->bytes_overwritten += record.record_size();
continue;
}
GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
// If key has expired, remove it from base DB.
// TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
// We can just drop the blob record.
if (file_expired || (has_ttl && now >= record.expiration)) {
gc_stats->num_keys_expired++;
gc_stats->bytes_expired += record.record_size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
WriteBatch delete_batch;
Status delete_status = delete_batch.Delete(record.key);
if (delete_status.ok()) {
delete_status = db_impl_->WriteWithCallback(WriteOptions(),
&delete_batch, &callback);
}
if (!delete_status.ok() && !delete_status.IsBusy()) {
// We hit an error.
s = delete_status;
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while deleting expired key: %s",
s.ToString().c_str());
break;
}
// Continue to next blob record or retry.
continue;
}
// Relocate the blob record to new file.
if (!newfile) {
// new file
std::string reason("GC of ");
reason += bfptr->PathName();
newfile = NewBlobFile(bfptr->HasTTL(), bfptr->expiration_range_, reason);
s = CheckOrCreateWriterLocked(newfile, &new_writer);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to open file %s for writer, error: %s",
newfile->PathName().c_str(), s.ToString().c_str());
break;
}
newfile->file_size_ = BlobLogHeader::kSize;
s = new_writer->WriteHeader(newfile->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File: %s - header writing failed",
newfile->PathName().c_str());
break;
}
// We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
// avoid user writes writing to the file, and avoid
// EvictExpiredFiles close the file by mistake.
WriteLock wl(&mutex_);
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
}
std::string new_index_entry;
uint64_t new_blob_offset = 0;
uint64_t new_key_offset = 0;
// write the blob to the blob log.
s = new_writer->AddRecord(record.key, record.value, record.expiration,
&new_key_offset, &new_blob_offset);
BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
new_blob_offset, record.value.size(),
bdb_options_.compression);
newfile->blob_count_++;
newfile->file_size_ +=
BlobLogRecord::kHeaderSize + record.key.size() + record.value.size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
WriteBatch rewrite_batch;
Status rewrite_status = WriteBatchInternal::PutBlobIndex(
&rewrite_batch, column_family_id, record.key, new_index_entry);
if (rewrite_status.ok()) {
rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
&rewrite_batch, &callback);
}
if (rewrite_status.ok()) {
gc_stats->num_keys_relocated++;
gc_stats->bytes_relocated += record.record_size();
} else if (rewrite_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->num_keys_overwritten++;
gc_stats->bytes_overwritten += record.record_size();
} else {
// We hit an error.
s = rewrite_status;
ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
s.ToString().c_str());
break;
}
} // end of ReadRecord loop
{
WriteLock wl(&mutex_);
ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/);
}
ROCKS_LOG_INFO(
db_options_.info_log,
"%s blob file %" PRIu64 ". Total blob records: %" PRIu64
", expired: %" PRIu64 " keys/%" PRIu64
" bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64
" bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.",
s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired,
gc_stats->bytes_expired, gc_stats->num_keys_overwritten,
gc_stats->bytes_overwritten, gc_stats->num_keys_relocated,
gc_stats->bytes_relocated);
RecordTick(statistics_, BLOB_DB_GC_NUM_FILES);
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
gc_stats->num_keys_overwritten);
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED,
gc_stats->num_keys_expired);
RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN,
gc_stats->bytes_overwritten);
RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
if (newfile != nullptr) {
{
MutexLock l(&write_mutex_);
WriteLock lock(&mutex_);
WriteLock file_lock(&newfile->mutex_);
CloseBlobFile(newfile);
}
total_blob_size_ += newfile->file_size_;
ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
newfile->BlobFileNumber());
RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
gc_stats->num_keys_relocated);
RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED,
gc_stats->bytes_relocated);
}
if (!s.ok()) {
RecordTick(statistics_, BLOB_DB_GC_FAILURES);
}
return s;
}
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
if (aborted) {
return std::make_pair(false, -1);
@ -2240,17 +1921,6 @@ void BlobDBImpl::CopyBlobFiles(
}
}
std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
if (aborted) {
return std::make_pair(false, -1);
}
// TODO(yiwu): Garbage collection implementation.
// reschedule
return std::make_pair(true, -1);
}
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
@ -2365,13 +2035,6 @@ void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
}
Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats) {
return GCFileAndUpdateLSM(bfile, gc_stats);
}
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
void BlobDBImpl::TEST_EvictExpiredFiles() {
EvictExpiredFiles(false /*abort*/);
}

@ -60,20 +60,11 @@ struct BlobFileComparator {
const std::shared_ptr<BlobFile>& rhs) const;
};
struct GCStats {
uint64_t blob_count = 0;
uint64_t num_keys_overwritten = 0;
uint64_t num_keys_expired = 0;
uint64_t num_keys_relocated = 0;
uint64_t bytes_overwritten = 0;
uint64_t bytes_expired = 0;
uint64_t bytes_relocated = 0;
};
/**
* The implementation class for BlobDB. This manages the value
* part in TTL aware sequentially written files. These files are
* Garbage Collected.
* The implementation class for BlobDB. It manages the blob logs, which
* are sequentially written files. Blob logs can be of the TTL or non-TTL
* varieties; the former are cleaned up when they expire, while the latter
* are (optionally) garbage collected.
*/
class BlobDBImpl : public BlobDB {
friend class BlobFile;
@ -86,12 +77,6 @@ class BlobDBImpl : public BlobDB {
// deletions check period
static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
// gc percentage each check period
static constexpr uint32_t kGCFilePercentage = 100;
// gc period
static constexpr uint32_t kGCCheckPeriodMillisecs = 60 * 1000;
// sanity check task
static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
@ -208,11 +193,6 @@ class BlobDBImpl : public BlobDB {
SequenceNumber obsolete_seq = 0,
bool update_size = true);
Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats);
void TEST_RunGC();
void TEST_EvictExpiredFiles();
void TEST_DeleteObsoleteFiles();
@ -231,7 +211,6 @@ class BlobDBImpl : public BlobDB {
#endif // !NDEBUG
private:
class GarbageCollectionWriteCallback;
class BlobInserter;
// Create a snapshot if there isn't one in read options.
@ -300,14 +279,10 @@ class BlobDBImpl : public BlobDB {
// periodic sanity check. Bunch of checks
std::pair<bool, int64_t> SanityCheck(bool aborted);
// delete files which have been garbage collected and marked
// obsolete. Check whether any snapshots exist which refer to
// the same
// Delete files that have been marked obsolete (either because of TTL
// or GC). Check whether any snapshots exist which refer to the same.
std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
// Major task to garbage collect expired and deleted blobs
std::pair<bool, int64_t> RunGC(bool aborted);
// periodically check if open blob files and their TTL's has expired
// if expired, close the sequential writer and make the file immutable
std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
@ -398,12 +373,6 @@ class BlobDBImpl : public BlobDB {
Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
std::shared_ptr<Writer>* writer);
// Iterate through keys and values on Blob and write into
// separate file the remaining blobs and delete/update pointers
// in LSM atomically
Status GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gcstats);
// checks if there is no snapshot which is referencing the
// blobs
bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);

@ -117,8 +117,6 @@ class BlobDBIterator : public Iterator {
private:
// Return true if caller should continue to next value.
bool UpdateBlobValue() {
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1");
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2");
value_.Reset();
status_ = Status::OK();
if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) {

@ -341,10 +341,6 @@ TEST_F(BlobDBTest, PutWithTTL) {
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
@ -371,10 +367,6 @@ TEST_F(BlobDBTest, PutUntil) {
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
@ -588,243 +580,6 @@ TEST_F(BlobDBTest, MultipleWriters) {
VerifyDB(data);
}
TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
DBImpl *db_impl = static_cast_with_check<DBImpl, DB>(blob_db_->GetBaseDB());
std::map<std::string, std::string> data;
for (int i = 0; i < 200; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
// Test for data in SST
size_t new_keys = 0;
for (int i = 0; i < 100; i++) {
if (rnd.Next() % 2 == 1) {
new_keys++;
PutRandom("key" + ToString(i), &rnd, &data);
}
}
db_impl->TEST_FlushMemTable(true /*wait*/);
// Test for data in memtable
for (int i = 100; i < 200; i++) {
if (rnd.Next() % 2 == 1) {
new_keys++;
PutRandom("key" + ToString(i), &rnd, &data);
}
}
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(200, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_keys_expired);
ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated);
VerifyDB(data);
}
TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
SyncPoint::GetInstance()->EnableProcessing();
auto writer = port::Thread(
[this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); });
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_keys_expired);
ASSERT_EQ(1, gc_stats.num_keys_overwritten);
ASSERT_EQ(0, gc_stats.num_keys_relocated);
writer.join();
VerifyDB({{"foo", "v2"}});
}
TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_current_time(100);
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
mock_env_->set_current_time(300);
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
SyncPoint::GetInstance()->EnableProcessing();
auto writer = port::Thread([this]() {
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400));
});
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_keys_expired);
ASSERT_EQ(0, gc_stats.num_keys_relocated);
writer.join();
VerifyDB({{"foo", "v2"}});
}
TEST_F(BlobDBTest, NewFileGeneratedFromGCShouldMarkAsImmutable) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
ASSERT_OK(Put("foo", "bar"));
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
auto blob_file1 = blob_files[0];
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file1));
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_file1, &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_keys_relocated);
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(blob_file1, blob_files[0]);
ASSERT_TRUE(blob_files[1]->Immutable());
}
// This test is no longer valid since we now return an error when we go
// over the configured max_db_size.
// The test needs to be re-written later in such a way that writes continue
// after a GC happens.
TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
// Use mock env to stop wall clock.
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.max_db_size = 100;
bdb_options.blob_file_size = 100;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::string value(100, 'v');
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key_with_ttl", value, 60));
for (int i = 0; i < 10; i++) {
ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value));
}
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(11, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_TRUE(blob_files[0]->Immutable());
for (int i = 1; i <= 10; i++) {
ASSERT_FALSE(blob_files[i]->HasTTL());
if (i < 10) {
ASSERT_TRUE(blob_files[i]->Immutable());
}
}
blob_db_impl()->TEST_RunGC();
// The oldest simple blob file (i.e. blob_files[1]) has been selected for GC.
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_EQ(blob_files[1]->BlobFileNumber(),
obsolete_files[0]->BlobFileNumber());
}
TEST_F(BlobDBTest, ReadWhileGC) {
// run the same test for Get(), MultiGet() and Iterator each.
for (int i = 0; i < 2; i++) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
blob_db_->Put(WriteOptions(), "foo", "bar");
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
uint64_t bfile_number = bfile->BlobFileNumber();
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
switch (i) {
case 0:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::Get:AfterIndexEntryGet:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBImpl::Get:AfterIndexEntryGet:2"}});
break;
case 1:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBIterator::UpdateBlobValue:Start:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBIterator::UpdateBlobValue:Start:2"}});
break;
}
SyncPoint::GetInstance()->EnableProcessing();
auto reader = port::Thread([this, i]() {
std::string value;
std::vector<std::string> values;
std::vector<Status> statuses;
switch (i) {
case 0:
ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value));
ASSERT_EQ("bar", value);
break;
case 1:
// VerifyDB use iterator to scan the DB.
VerifyDB({{"foo", "bar"}});
break;
}
});
TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1");
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_keys_relocated);
blob_db_impl()->TEST_DeleteObsoleteFiles();
// The file shouln't be deleted
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber());
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber());
TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2");
reader.join();
SyncPoint::GetInstance()->DisableProcessing();
// The file is deleted this time
blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber());
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB({{"foo", "bar"}});
Destroy();
}
}
TEST_F(BlobDBTest, SstFileManager) {
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
@ -835,16 +590,20 @@ TEST_F(BlobDBTest, SstFileManager) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
Options db_options;
int files_deleted_directly = 0;
int files_scheduled_to_delete = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion",
[&](void * /*arg*/) { files_scheduled_to_delete++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile",
[&](void * /*arg*/) { files_deleted_directly++; });
"SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
assert(arg);
const std::string *const file_path =
static_cast<const std::string *>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
SyncPoint::GetInstance()->EnableProcessing();
db_options.sst_file_manager = sst_file_manager;
@ -856,34 +615,29 @@ TEST_F(BlobDBTest, SstFileManager) {
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_impl()->TEST_DeleteObsoleteFiles();
// Even if SSTFileManager is not set, DB is creating a dummy one.
ASSERT_EQ(1, files_scheduled_to_delete);
ASSERT_EQ(0, files_deleted_directly);
Destroy();
// Make sure that DestroyBlobDB() also goes through delete scheduler.
ASSERT_GE(files_scheduled_to_delete, 2);
// Due to a timing issue, the WAL may or may not be deleted directly. The
// blob file is first scheduled, followed by WAL. If the background trash
// thread does not wake up on time, the WAL file will be directly
// deleted as the trash size will be > DB size
ASSERT_LE(files_deleted_directly, 1);
ASSERT_EQ(2, files_scheduled_to_delete);
SyncPoint::GetInstance()->DisableProcessing();
sfm->WaitForEmptyTrash();
}
TEST_F(BlobDBTest, SstFileManagerRestart) {
int files_deleted_directly = 0;
int files_scheduled_to_delete = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion",
[&](void * /*arg*/) { files_scheduled_to_delete++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile",
[&](void * /*arg*/) { files_deleted_directly++; });
"SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
assert(arg);
const std::string *const file_path =
static_cast<const std::string *>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
@ -912,9 +666,7 @@ TEST_F(BlobDBTest, SstFileManagerRestart) {
// Make sure that reopening the DB rescan the existing trash files
Open(bdb_options, db_options);
ASSERT_GE(files_scheduled_to_delete, 3);
// Depending on timing, the WAL file may or may not be directly deleted
ASSERT_LE(files_deleted_directly, 1);
ASSERT_EQ(files_scheduled_to_delete, 2);
sfm->WaitForEmptyTrash();
@ -937,67 +689,68 @@ TEST_F(BlobDBTest, SstFileManagerRestart) {
TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.enable_garbage_collection = true;
bdb_options.garbage_collection_cutoff = 1.0;
bdb_options.disable_background_tasks = true;
// i = when to take snapshot
for (int i = 0; i < 4; i++) {
for (bool delete_key : {true, false}) {
const Snapshot *snapshot = nullptr;
Destroy();
Open(bdb_options);
// First file
ASSERT_OK(Put("key1", "value"));
if (i == 0) {
snapshot = blob_db_->GetSnapshot();
}
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
// Second file
ASSERT_OK(Put("key2", "value"));
if (i == 1) {
snapshot = blob_db_->GetSnapshot();
}
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
auto bfile = blob_files[1];
ASSERT_FALSE(bfile->Immutable());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
// Third file
ASSERT_OK(Put("key3", "value"));
if (i == 2) {
snapshot = blob_db_->GetSnapshot();
}
if (delete_key) {
Delete("key2");
}
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_TRUE(bfile->Obsolete());
ASSERT_EQ(1, gc_stats.blob_count);
if (delete_key) {
ASSERT_EQ(0, gc_stats.num_keys_relocated);
} else {
ASSERT_EQ(1, gc_stats.num_keys_relocated);
}
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
if (i == 3) {
snapshot = blob_db_->GetSnapshot();
}
size_t num_files = delete_key ? 3 : 4;
ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
Destroy();
Open(bdb_options);
const Snapshot *snapshot = nullptr;
// First file
ASSERT_OK(Put("key1", "value"));
if (i == 0) {
snapshot = blob_db_->GetSnapshot();
}
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
// Second file
ASSERT_OK(Put("key2", "value"));
if (i == 1) {
snapshot = blob_db_->GetSnapshot();
}
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
auto bfile = blob_files[1];
ASSERT_FALSE(bfile->Immutable());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
// Third file
ASSERT_OK(Put("key3", "value"));
if (i == 2) {
snapshot = blob_db_->GetSnapshot();
}
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_TRUE(bfile->Obsolete());
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
Delete("key2");
if (i == 3) {
snapshot = blob_db_->GetSnapshot();
}
ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (i >= 2) {
// The snapshot shouldn't see data in bfile
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
} else {
// The snapshot will see data in bfile, so the file shouldn't be deleted
ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (i == 3) {
// The snapshot shouldn't see data in bfile
ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
} else {
// The snapshot will see data in bfile, so the file shouldn't be deleted
ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
}
ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
}
}
}
@ -1434,9 +1187,7 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
// filtered regardless of snapshot.
const Snapshot *snapshot = blob_db_->GetSnapshot();
// Issue manual compaction to trigger compaction filter.
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
blob_db_->DefaultColumnFamily(), nullptr,
nullptr));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
blob_db_->ReleaseSnapshot(snapshot);
// Verify expired blob index are filtered.
std::vector<KeyVersion> versions;
@ -1450,7 +1201,7 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
}
// Test compaction filter should remove any blob index where corresponding
// blob file has been removed (either by FIFO or garbage collection).
// blob file has been removed.
TEST_F(BlobDBTest, FilterFileNotAvailable) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
@ -1481,7 +1232,6 @@ TEST_F(BlobDBTest, FilterFileNotAvailable) {
ASSERT_EQ("foo", versions[1].user_key);
VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
ASSERT_OK(blob_db_->Flush(FlushOptions()));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
ASSERT_EQ(2, versions.size());
@ -1720,9 +1470,7 @@ TEST_F(BlobDBTest, GarbageCollection) {
mock_env_->set_current_time(kCompactTime);
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
blob_db_->DefaultColumnFamily(), nullptr,
nullptr));
ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// We expect the data to remain the same and the blobs from the oldest N files
// to be moved to new files. Sequence numbers get zeroed out during the

@ -76,7 +76,7 @@ class BlobFile {
// The latest sequence number when the file was closed/made immutable.
SequenceNumber immutable_sequence_{0};
// has a pass of garbage collection successfully finished on this file
// Whether the file was marked obsolete (due to either TTL or GC).
// obsolete_ still needs to do iterator/snapshot checks
std::atomic<bool> obsolete_{false};
@ -168,13 +168,13 @@ class BlobFile {
return immutable_sequence_;
}
// if the file has gone through GC and blobs have been relocated
// Whether the file was marked obsolete (due to either TTL or GC).
bool Obsolete() const {
assert(Immutable() || !obsolete_.load());
return obsolete_.load();
}
// Mark file as obsolete by garbage collection. The file is not visible to
// Mark file as obsolete (due to either TTL or GC). The file is not visible to
// snapshots with sequence greater or equal to the given sequence.
void MarkObsolete(SequenceNumber sequence);

Loading…
Cancel
Save