Blob DB: fix snapshot handling

Summary:
Blob db will keep blob file if data in the file is visible to an active snapshot. Before this patch it checks whether there is an active snapshot has sequence number greater than the earliest sequence in the file. This is problematic since we take snapshot on every read, if it keep having reads, old blob files will not be cleanup. Change to check if there is an active snapshot falls in the range of [earliest_sequence, obsolete_sequence) where obsolete sequence is
1. if data is relocated to another file by garbage collection, it is the latest sequence at the time garbage collection finish
2. otherwise, it is the latest sequence of the file
Closes https://github.com/facebook/rocksdb/pull/3087

Differential Revision: D6182519

Pulled By: yiwu-arbug

fbshipit-source-id: cdf4c35281f782eb2a9ad6a87b6727bbdff27a45
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent f662f8f0b6
commit 7bfa88037e
  1. 8
      db/db_impl.cc
  2. 4
      db/db_impl.h
  3. 16
      db/snapshot_impl.h
  4. 90
      utilities/blob_db/blob_db_impl.cc
  5. 3
      utilities/blob_db/blob_db_impl.h
  6. 148
      utilities/blob_db/blob_db_test.cc
  7. 18
      utilities/blob_db/blob_file.cc
  8. 30
      utilities/blob_db/blob_file.h

@ -1663,12 +1663,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
delete casted_s;
}
bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) {
bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound) {
InstrumentedMutexLock l(&mutex_);
if (snapshots_.empty()) {
return false;
}
return (snapshots_.newest()->GetSequenceNumber() >= sn);
return snapshots_.HasSnapshotInRange(lower_bound, upper_bound);
}
#ifndef ROCKSDB_LITE

@ -227,7 +227,9 @@ class DBImpl : public DB {
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
bool HasActiveSnapshotLaterThanSN(SequenceNumber sn);
// Whether there is an active snapshot in range [lower_bound, upper_bound).
bool HasActiveSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound);
#ifndef ROCKSDB_LITE
using DB::ResetStats;

@ -108,6 +108,22 @@ class SnapshotList {
return ret;
}
// Whether there is an active snapshot in range [lower_bound, upper_bound).
bool HasSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound) {
if (empty()) {
return false;
}
const SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
if (s->next_->number_ >= lower_bound) {
return s->next_->number_ < upper_bound;
}
s = s->next_;
}
return false;
}
// get the sequence number of the most recent snapshot
SequenceNumber GetNewest() {
if (empty()) {

@ -956,10 +956,24 @@ bool BlobDBImpl::EvictOldestBlobFile() {
}
WriteLock wl(&mutex_);
oldest_file->SetCanBeDeleted();
obsolete_files_.push_front(oldest_file);
oldest_file_evicted_.store(true);
return true;
// Double check the file is not obsolete by others
if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) {
auto expiration_range = oldest_file->GetExpirationRange();
ROCKS_LOG_INFO(db_options_.info_log,
"Evict oldest blob file since DB out of space. Current "
"space used: %" PRIu64 ", blob dir size: %" PRIu64
", evicted blob file #%" PRIu64
" with expiration range (%" PRIu64 ", %" PRIu64 ").",
total_blob_space_.load(), bdb_options_.blob_dir_size,
oldest_file->BlobFileNumber(), expiration_range.first,
expiration_range.second);
oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second);
obsolete_files_.push_back(oldest_file);
oldest_file_evicted_.store(true);
return true;
}
return false;
}
Status BlobDBImpl::CheckSize(size_t blob_size) {
@ -1299,27 +1313,12 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
return CloseBlobFile(bfile);
}
bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
bool BlobDBImpl::VisibleToActiveSnapshot(
const std::shared_ptr<BlobFile>& bfile) {
assert(bfile->Obsolete());
SequenceNumber esn = bfile->GetSequenceRange().first;
// TODO(yiwu): Here we should check instead if there is an active snapshot
// lies between the first sequence in the file, and the last sequence by
// the time the file finished being garbage collect.
bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn);
if (notok) {
ROCKS_LOG_INFO(db_options_.info_log,
"Could not delete file due to snapshot failure %s",
bfile->PathName().c_str());
return false;
} else {
ROCKS_LOG_INFO(db_options_.info_log,
"Will delete file due to snapshot success %s",
bfile->PathName().c_str());
return true;
}
SequenceNumber first_sequence = bfile->GetSequenceRange().first;
SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
return db_impl_->HasActiveSnapshotInRange(first_sequence, obsolete_sequence);
}
bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
@ -1697,7 +1696,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/,
nullptr /*read_callback*/, &is_blob_index);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
if (!get_status.ok() && !get_status.ok()) {
if (!get_status.ok() && !get_status.IsNotFound()) {
// error
s = get_status;
ROCKS_LOG_ERROR(db_options_.info_log,
@ -1814,6 +1813,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
&rewrite_batch, &callback);
}
if (rewrite_status.ok()) {
newfile->ExtendSequenceRange(
WriteBatchInternal::Sequence(&rewrite_batch));
gc_stats->relocate_succeeded++;
} else if (rewrite_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
@ -1827,6 +1828,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
}
} // end of ReadRecord loop
if (s.ok()) {
SequenceNumber obsolete_sequence =
newfile == nullptr ? bfptr->GetSequenceRange().second + 1
: newfile->GetSequenceRange().second;
bfptr->MarkObsolete(obsolete_sequence);
if (!first_gc) {
WriteLock wl(&mutex_);
obsolete_files_.push_back(bfptr);
}
}
ROCKS_LOG_INFO(
db_options_.info_log,
"%s blob file %" PRIu64
@ -1935,11 +1947,17 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
auto bfile = *iter;
{
ReadLock lockbfile_r(&bfile->mutex_);
if (!FileDeleteOk_SnapshotCheckLocked(bfile)) {
if (VisibleToActiveSnapshot(bfile)) {
ROCKS_LOG_INFO(db_options_.info_log,
"Could not delete file due to snapshot failure %s",
bfile->PathName().c_str());
++iter;
continue;
}
}
ROCKS_LOG_INFO(db_options_.info_log,
"Will delete file due to snapshot success %s",
bfile->PathName().c_str());
blob_files_.erase(bfile->BlobFileNumber());
Status s = env_->DeleteFile(bfile->PathName());
@ -2069,8 +2087,6 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
FilterSubsetOfFiles(blob_files, &to_process, current_epoch_,
files_to_collect);
// in this collect the set of files, which became obsolete
std::vector<std::shared_ptr<BlobFile>> obsoletes;
for (auto bfile : to_process) {
GCStats gc_stats;
Status s = GCFileAndUpdateLSM(bfile, &gc_stats);
@ -2084,16 +2100,6 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
bfile->deleted_size_ = gc_stats.deleted_size;
bfile->deleted_count_ = gc_stats.num_deletes;
bfile->gc_once_after_open_ = false;
} else {
obsoletes.push_back(bfile);
}
}
if (!obsoletes.empty()) {
WriteLock wl(&mutex_);
for (auto bfile : obsoletes) {
bfile->SetCanBeDeleted();
obsolete_files_.push_front(bfile);
}
}
@ -2190,16 +2196,6 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
}
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
void BlobDBImpl::TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile) {
uint64_t number = bfile->BlobFileNumber();
assert(blob_files_.count(number) > 0);
bfile->SetCanBeDeleted();
{
WriteLock l(&mutex_);
obsolete_files_.push_back(bfile);
}
}
#endif // !NDEBUG
} // namespace blob_db

@ -279,8 +279,6 @@ class BlobDBImpl : public BlobDB {
void TEST_RunGC();
void TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile);
void TEST_DeleteObsoleteFiles();
#endif // !NDEBUG
@ -411,6 +409,7 @@ class BlobDBImpl : public BlobDB {
// checks if there is no snapshot which is referencing the
// blobs
bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
bool MarkBlobDeleted(const Slice& key, const Slice& lsmValue);

@ -63,6 +63,22 @@ class BlobDBTest : public testing::Test {
}
}
BlobDBImpl *blob_db_impl() {
return reinterpret_cast<BlobDBImpl *>(blob_db_);
}
Status Put(const Slice &key, const Slice &value) {
return blob_db_->Put(WriteOptions(), key, value);
}
void Delete(const std::string &key,
std::map<std::string, std::string> *data = nullptr) {
ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
if (data != nullptr) {
data->erase(key);
}
}
void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
@ -111,14 +127,6 @@ class BlobDBTest : public testing::Test {
}
}
void Delete(const std::string &key,
std::map<std::string, std::string> *data = nullptr) {
ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
if (data != nullptr) {
data->erase(key);
}
}
// Verify blob db contain expected data and nothing more.
void VerifyDB(const std::map<std::string, std::string> &data) {
VerifyDB(blob_db_, data);
@ -593,16 +601,14 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
DBImpl *db_impl = static_cast_with_check<DBImpl, DB>(blob_db_->GetBaseDB());
std::map<std::string, std::string> data;
for (int i = 0; i < 200; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
// Test for data in SST
size_t new_keys = 0;
for (int i = 0; i < 100; i++) {
@ -620,7 +626,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
}
}
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(200, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(200 - new_keys, gc_stats.num_relocate);
@ -634,11 +640,9 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
bdb_options.disable_background_tasks = true;
Open(bdb_options);
ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
@ -651,7 +655,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
[this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); });
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(1, gc_stats.num_relocate);
@ -671,11 +675,9 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
Open(bdb_options, options);
mock_env_->set_current_time(100);
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200));
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
mock_env_->set_current_time(300);
SyncPoint::GetInstance()->LoadDependency(
@ -690,7 +692,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
});
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_deletes);
ASSERT_EQ(0, gc_stats.delete_succeeded);
@ -719,9 +721,7 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
for (int i = 0; i < 10; i++) {
ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value));
}
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(11, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_TRUE(blob_files[0]->Immutable());
@ -731,9 +731,9 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
ASSERT_TRUE(blob_files[i]->Immutable());
}
}
blob_db_impl->TEST_RunGC();
blob_db_impl()->TEST_RunGC();
// The oldest simple blob file (i.e. blob_files[1]) has been selected for GC.
auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles();
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_EQ(blob_files[1]->BlobFileNumber(),
obsolete_files[0]->BlobFileNumber());
@ -747,13 +747,11 @@ TEST_F(BlobDBTest, ReadWhileGC) {
bdb_options.disable_background_tasks = true;
Open(bdb_options);
blob_db_->Put(WriteOptions(), "foo", "bar");
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
std::shared_ptr<BlobFile> bfile = blob_files[0];
uint64_t bfile_number = bfile->BlobFileNumber();
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile));
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
switch (i) {
case 0:
@ -791,17 +789,16 @@ TEST_F(BlobDBTest, ReadWhileGC) {
TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1");
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(1, gc_stats.relocate_succeeded);
blob_db_impl->TEST_ObsoleteFile(blob_files[0]);
blob_db_impl->TEST_DeleteObsoleteFiles();
blob_db_impl()->TEST_DeleteObsoleteFiles();
// The file shouln't be deleted
blob_files = blob_db_impl->TEST_GetBlobFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber());
auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles();
auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
ASSERT_EQ(1, obsolete_files.size());
ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber());
TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2");
@ -809,16 +806,85 @@ TEST_F(BlobDBTest, ReadWhileGC) {
SyncPoint::GetInstance()->DisableProcessing();
// The file is deleted this time
blob_db_impl->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl->TEST_GetBlobFiles();
blob_db_impl()->TEST_DeleteObsoleteFiles();
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber());
ASSERT_EQ(0, blob_db_impl->TEST_GetObsoleteFiles().size());
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
VerifyDB({{"foo", "bar"}});
Destroy();
}
}
TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
// i = when to take snapshot
for (int i = 0; i < 4; i++) {
for (bool delete_key : {true, false}) {
const Snapshot *snapshot = nullptr;
Destroy();
Open(bdb_options);
// First file
ASSERT_OK(Put("key1", "value"));
if (i == 0) {
snapshot = blob_db_->GetSnapshot();
}
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
// Second file
ASSERT_OK(Put("key2", "value"));
if (i == 1) {
snapshot = blob_db_->GetSnapshot();
}
blob_files = blob_db_impl()->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
auto bfile = blob_files[1];
ASSERT_FALSE(bfile->Immutable());
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
// Third file
ASSERT_OK(Put("key3", "value"));
if (i == 2) {
snapshot = blob_db_->GetSnapshot();
}
if (delete_key) {
Delete("key2");
}
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_TRUE(bfile->Obsolete());
ASSERT_EQ(1, gc_stats.blob_count);
if (delete_key) {
ASSERT_EQ(0, gc_stats.num_relocate);
ASSERT_EQ(bfile->GetSequenceRange().second + 1,
bfile->GetObsoleteSequence());
} else {
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
}
if (i == 3) {
snapshot = blob_db_->GetSnapshot();
}
size_t num_files = delete_key ? 3 : 4;
ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (i == 0 || i == 3 || (i == 2 && delete_key)) {
// The snapshot shouldn't see data in bfile
ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
} else {
// The snapshot will see data in bfile, so the file shouldn't be deleted
ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
blob_db_impl()->TEST_DeleteObsoleteFiles();
ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
}
}
}
}
TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
Options options;
options.env = mock_env_.get();
@ -962,7 +1028,7 @@ TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) {
bdb_options.is_fifo = true;
Open(bdb_options);
// Each stored blob has an overhead of about 32 bytes currently.
// Each stored blob has an overhead of 32 bytes currently.
// So a 100 byte blob should take up 132 bytes.
std::string value(100, 'v');
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));

@ -36,7 +36,7 @@ BlobFile::BlobFile()
deleted_count_(0),
deleted_size_(0),
closed_(false),
can_be_deleted_(false),
obsolete_(false),
gc_once_after_open_(false),
expiration_range_({0, 0}),
sequence_range_({kMaxSequenceNumber, 0}),
@ -55,7 +55,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
deleted_count_(0),
deleted_size_(0),
closed_(false),
can_be_deleted_(false),
obsolete_(false),
gc_once_after_open_(false),
expiration_range_({0, 0}),
sequence_range_({kMaxSequenceNumber, 0}),
@ -64,7 +64,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
header_valid_(false) {}
BlobFile::~BlobFile() {
if (can_be_deleted_) {
if (obsolete_) {
std::string pn(PathName());
Status s = Env::Default()->DeleteFile(PathName());
if (!s.ok()) {
@ -110,17 +110,21 @@ std::string BlobFile::DumpState() const {
"path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64
" file_size: %" PRIu64 " deleted_count: %" PRIu64
" deleted_size: %" PRIu64
" closed: %d can_be_deleted: %d expiration_range: (%" PRIu64
", %" PRIu64 ") sequence_range: (%" PRIu64 " %" PRIu64
"), writer: %d reader: %d",
" closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
") sequence_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d",
path_to_dir_.c_str(), file_number_, blob_count_.load(),
gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_,
closed_.load(), can_be_deleted_.load(), expiration_range_.first,
closed_.load(), obsolete_.load(), expiration_range_.first,
expiration_range_.second, sequence_range_.first,
sequence_range_.second, (!!log_writer_), (!!ra_file_reader_));
return str;
}
void BlobFile::MarkObsolete(SequenceNumber sequence) {
obsolete_sequence_ = sequence;
obsolete_.store(true);
}
bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
assert(last_fsync_ <= file_size_);
return (hard) ? file_size_ > last_fsync_

@ -63,8 +63,12 @@ class BlobFile {
std::atomic<bool> closed_;
// has a pass of garbage collection successfully finished on this file
// can_be_deleted_ still needs to do iterator/snapshot checks
std::atomic<bool> can_be_deleted_;
// obsolete_ still needs to do iterator/snapshot checks
std::atomic<bool> obsolete_;
// The last sequence number by the time the file marked as obsolete.
// Data in this file is visible to a snapshot taken before the sequence.
SequenceNumber obsolete_sequence_;
// should this file been gc'd once to reconcile lost deletes/compactions
std::atomic<bool> gc_once_after_open_;
@ -91,6 +95,8 @@ class BlobFile {
bool header_valid_;
SequenceNumber garbage_collection_finish_sequence_;
public:
BlobFile();
@ -117,7 +123,19 @@ class BlobFile {
std::string DumpState() const;
// if the file has gone through GC and blobs have been relocated
bool Obsolete() const { return can_be_deleted_.load(); }
bool Obsolete() const {
assert(Immutable() || !obsolete_.load());
return obsolete_.load();
}
// Mark file as obsolete by garbage collection. The file is not visible to
// snapshots with sequence greater or equal to the given sequence.
void MarkObsolete(SequenceNumber sequence);
SequenceNumber GetObsoleteSequence() const {
assert(Obsolete());
return obsolete_sequence_;
}
// if the file is not taking any more appends.
bool Immutable() const { return closed_.load(); }
@ -125,6 +143,8 @@ class BlobFile {
// we will assume this is atomic
bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
void Fsync();
uint64_t GetFileSize() const {
return file_size_.load(std::memory_order_acquire);
}
@ -155,8 +175,6 @@ class BlobFile {
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
void Fsync();
private:
std::shared_ptr<Reader> OpenSequentialReader(
Env* env, const DBOptions& db_options,
@ -183,8 +201,6 @@ class BlobFile {
void SetFileSize(uint64_t fs) { file_size_ = fs; }
void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
void SetCanBeDeleted() { can_be_deleted_ = true; }
};
} // namespace blob_db
} // namespace rocksdb

Loading…
Cancel
Save