BlobDB: Remove the need to get sequence number per write

Summary:
Previously we store sequence number range of each blob files, and use the sequence number range to check if the file can be possibly visible by a snapshot. But it adds complexity to the code, since the sequence number is only available after a write. (The current implementation get sequence number by calling GetLatestSequenceNumber(), which is wrong.) With the patch, we are not storing sequence number range, and check if snapshot_sequence < obsolete_sequence to decide if the file is visible by a snapshot (previously we check if first_sequence <= snapshot_sequence < obsolete_sequence).
Closes https://github.com/facebook/rocksdb/pull/3274

Differential Revision: D6571497

Pulled By: yiwu-arbug

fbshipit-source-id: ca06479dc1fcd8782f6525b62b7762cd47d61909
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent a79c7c05e8
commit 237b292515
  1. 6
      db/db_impl.cc
  2. 4
      db/db_impl.h
  3. 46
      db/db_write_test.cc
  4. 16
      db/snapshot_impl.h
  5. 60
      utilities/blob_db/blob_db_impl.cc
  6. 2
      utilities/blob_db/blob_db_impl.h
  7. 25
      utilities/blob_db/blob_db_test.cc
  8. 2
      utilities/blob_db/blob_dump_tool.cc
  9. 10
      utilities/blob_db/blob_file.cc
  10. 13
      utilities/blob_db/blob_file.h
  11. 8
      utilities/blob_db/blob_log_format.cc
  12. 16
      utilities/blob_db/blob_log_format.h

@ -1682,12 +1682,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
delete casted_s;
}
bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound) {
InstrumentedMutexLock l(&mutex_);
return snapshots_.HasSnapshotInRange(lower_bound, upper_bound);
}
#ifndef ROCKSDB_LITE
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
TablePropertiesCollection* props) {

@ -229,10 +229,6 @@ class DBImpl : public DB {
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override;
// Whether there is an active snapshot in range [lower_bound, upper_bound).
bool HasActiveSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound);
#ifndef ROCKSDB_LITE
using DB::ResetStats;
virtual Status ResetStats() override;

@ -39,52 +39,6 @@ TEST_P(DBWriteTest, SyncAndDisableWAL) {
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
}
// Sequence number should be return through input write batch.
TEST_P(DBWriteTest, ReturnSeuqneceNumber) {
Random rnd(4422);
Open();
for (int i = 0; i < 100; i++) {
WriteBatch batch;
batch.Put("key" + ToString(i), test::RandomHumanReadableString(&rnd, 10));
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(),
WriteBatchInternal::Sequence(&batch));
}
}
TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
constexpr size_t kThreads = 16;
constexpr size_t kNumKeys = 1000;
Open();
ASSERT_EQ(0, dbfull()->GetLatestSequenceNumber());
// Check each sequence is used once and only once.
std::vector<std::atomic_flag> flags(kNumKeys * kThreads + 1);
for (size_t i = 0; i < flags.size(); i++) {
flags[i].clear();
}
auto writer = [&](size_t id) {
Random rnd(4422 + static_cast<uint32_t>(id));
for (size_t k = 0; k < kNumKeys; k++) {
WriteBatch batch;
batch.Put("key" + ToString(id) + "-" + ToString(k),
test::RandomHumanReadableString(&rnd, 10));
ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
ASSERT_GT(sequence, 0);
ASSERT_LE(sequence, kNumKeys * kThreads);
// The sequence isn't consumed by someone else.
ASSERT_FALSE(flags[sequence].test_and_set());
}
};
std::vector<port::Thread> threads;
for (size_t i = 0; i < kThreads; i++) {
threads.emplace_back(writer, i);
}
for (size_t i = 0; i < kThreads; i++) {
threads[i].join();
}
}
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
constexpr int kNumThreads = 5;
std::unique_ptr<FaultInjectionTestEnv> mock_env(

@ -108,22 +108,6 @@ class SnapshotList {
return ret;
}
// Whether there is an active snapshot in range [lower_bound, upper_bound).
bool HasSnapshotInRange(SequenceNumber lower_bound,
SequenceNumber upper_bound) {
if (empty()) {
return false;
}
const SnapshotImpl* s = &list_;
while (s->next_ != &list_) {
if (s->next_->number_ >= lower_bound) {
return s->next_->number_ < upper_bound;
}
s = s->next_;
}
return false;
}
// get the sequence number of the most recent snapshot
SequenceNumber GetNewest() {
if (empty()) {

@ -571,18 +571,14 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
const WriteOptions& options_;
BlobDBImpl* blob_db_impl_;
uint32_t default_cf_id_;
SequenceNumber sequence_;
WriteBatch batch_;
public:
BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
uint32_t default_cf_id, SequenceNumber seq)
uint32_t default_cf_id)
: options_(options),
blob_db_impl_(blob_db_impl),
default_cf_id_(default_cf_id),
sequence_(seq) {}
SequenceNumber sequence() { return sequence_; }
default_cf_id_(default_cf_id) {}
WriteBatch* batch() { return &batch_; }
@ -597,8 +593,7 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
uint64_t expiration =
blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value);
Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice,
expiration, sequence_, &batch_);
sequence_++;
expiration, &batch_);
return s;
}
@ -609,7 +604,6 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
"Blob DB doesn't support non-default column family.");
}
Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
sequence_++;
return s;
}
@ -621,7 +615,6 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
}
Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
begin_key, end_key);
sequence_++;
return s;
}
@ -643,12 +636,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
RecordTick(statistics_, BLOB_DB_NUM_WRITE);
uint32_t default_cf_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
// TODO(yiwu): In case there are multiple writers the latest sequence would
// not be the actually sequence we are writting. Need to get the sequence
// from write batch after DB write instead.
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
Status s;
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
BlobInserter blob_inserter(options, this, default_cf_id);
{
// Release write_mutex_ before DB write to avoid race condition with
// flush begin listener, which also require write_mutex_ to sync
@ -693,6 +682,8 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (bdb_options_.enable_garbage_collection) {
// add deleted key to list of keys that have been deleted for book-keeping
SequenceNumber current_seq =
WriteBatchInternal::Sequence(blob_inserter.batch());
DeleteBookkeeper delete_bookkeeper(this, current_seq);
s = updates->Iterate(&delete_bookkeeper);
}
@ -761,11 +752,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
// flush begin listener, which also require write_mutex_ to sync
// blob files.
MutexLock l(&write_mutex_);
// TODO(yiwu): In case there are multiple writers the latest sequence would
// not be the actually sequence we are writting. Need to get the sequence
// from write batch after DB write instead.
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
s = PutBlobValue(options, key, value, expiration, sequence, &batch);
s = PutBlobValue(options, key, value, expiration, &batch);
}
if (s.ok()) {
s = db_->Write(options, &batch);
@ -776,7 +763,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration,
SequenceNumber sequence, WriteBatch* batch) {
WriteBatch* batch) {
Status s;
std::string index_entry;
uint32_t column_family_id =
@ -817,7 +804,6 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
}
if (s.ok()) {
bfile->ExtendSequenceRange(sequence);
if (expiration != kNoExpiration) {
bfile->ExtendExpirationRange(expiration);
}
@ -898,7 +884,7 @@ bool BlobDBImpl::EvictOldestBlobFile() {
total_blob_space_.load(), bdb_options_.blob_dir_size,
oldest_file->BlobFileNumber(), expiration_range.first,
expiration_range.second);
oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second);
oldest_file->MarkObsolete(GetLatestSequenceNumber());
obsolete_files_.push_back(oldest_file);
oldest_file_evicted_.store(true);
RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
@ -1271,9 +1257,26 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
bool BlobDBImpl::VisibleToActiveSnapshot(
const std::shared_ptr<BlobFile>& bfile) {
assert(bfile->Obsolete());
SequenceNumber first_sequence = bfile->GetSequenceRange().first;
// We check whether the oldest snapshot is no less than the last sequence
// by the time the blob file become obsolete. If so, the blob file is not
// visible to all existing snapshots.
//
// If we keep track of the earliest sequence of the keys in the blob file,
// we could instead check if there's a snapshot falls in range
// [earliest_sequence, obsolete_sequence). But doing so will make the
// implementation more complicated.
SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
return db_impl_->HasActiveSnapshotInRange(first_sequence, obsolete_sequence);
SequenceNumber oldest_snapshot = 0;
{
// Need to lock DBImpl mutex before access snapshot list.
InstrumentedMutexLock l(db_impl_->mutex());
auto snapshots = db_impl_->snapshots();
if (!snapshots.empty()) {
oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
}
}
return oldest_snapshot < obsolete_sequence;
}
bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
@ -1757,8 +1760,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
&rewrite_batch, &callback);
}
if (rewrite_status.ok()) {
newfile->ExtendSequenceRange(
WriteBatchInternal::Sequence(&rewrite_batch));
gc_stats->num_keys_relocated++;
gc_stats->bytes_relocated += record.record_size();
} else if (rewrite_status.IsBusy()) {
@ -1775,10 +1776,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
} // end of ReadRecord loop
if (s.ok()) {
SequenceNumber obsolete_sequence =
newfile == nullptr ? bfptr->GetSequenceRange().second + 1
: newfile->GetSequenceRange().second;
bfptr->MarkObsolete(obsolete_sequence);
bfptr->MarkObsolete(GetLatestSequenceNumber());
if (!first_gc) {
WriteLock wl(&mutex_);
obsolete_files_.push_back(bfptr);

@ -284,7 +284,7 @@ class BlobDBImpl : public BlobDB {
Status PutBlobValue(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration,
SequenceNumber sequence, WriteBatch* batch);
WriteBatch* batch);
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,

@ -901,20 +901,18 @@ TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
ASSERT_EQ(1, gc_stats.blob_count);
if (delete_key) {
ASSERT_EQ(0, gc_stats.num_keys_relocated);
ASSERT_EQ(bfile->GetSequenceRange().second + 1,
bfile->GetObsoleteSequence());
} else {
ASSERT_EQ(1, gc_stats.num_keys_relocated);
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
}
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
if (i == 3) {
snapshot = blob_db_->GetSnapshot();
}
size_t num_files = delete_key ? 3 : 4;
ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_impl()->TEST_DeleteObsoleteFiles();
if (i == 0 || i == 3 || (i == 2 && delete_key)) {
if (i == 3) {
// The snapshot shouldn't see data in bfile
ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
blob_db_->ReleaseSnapshot(snapshot);
@ -1111,10 +1109,6 @@ TEST_F(BlobDBTest, InlineSmallValues) {
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, KeyVersion> versions;
SequenceNumber first_non_ttl_seq = kMaxSequenceNumber;
SequenceNumber first_ttl_seq = kMaxSequenceNumber;
SequenceNumber last_non_ttl_seq = 0;
SequenceNumber last_ttl_seq = 0;
for (size_t i = 0; i < 1000; i++) {
bool is_small_value = rnd.Next() % 2;
bool has_ttl = rnd.Next() % 2;
@ -1134,15 +1128,6 @@ TEST_F(BlobDBTest, InlineSmallValues) {
versions[key] =
KeyVersion(key, value, sequence,
(is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
if (!is_small_value) {
if (!has_ttl) {
first_non_ttl_seq = std::min(first_non_ttl_seq, sequence);
last_non_ttl_seq = std::max(last_non_ttl_seq, sequence);
} else {
first_ttl_seq = std::min(first_ttl_seq, sequence);
last_ttl_seq = std::max(last_ttl_seq, sequence);
}
}
}
VerifyDB(data);
VerifyBaseDB(versions);
@ -1159,11 +1144,7 @@ TEST_F(BlobDBTest, InlineSmallValues) {
ttl_file = blob_files[1];
}
ASSERT_FALSE(non_ttl_file->HasTTL());
ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSequenceRange().first);
ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSequenceRange().second);
ASSERT_TRUE(ttl_file->HasTTL());
ASSERT_EQ(first_ttl_seq, ttl_file->GetSequenceRange().first);
ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second);
}
TEST_F(BlobDBTest, CompactionFilterNotSupported) {

@ -142,8 +142,6 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size,
fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count);
fprintf(stdout, " Expiration Range : %s\n",
GetString(footer.expiration_range).c_str());
fprintf(stdout, " Sequence Range : %s\n",
GetString(footer.sequence_range).c_str());
return s;
}

@ -43,7 +43,6 @@ BlobFile::BlobFile()
obsolete_(false),
gc_once_after_open_(false),
expiration_range_({0, 0}),
sequence_range_({kMaxSequenceNumber, 0}),
last_access_(-1),
last_fsync_(0),
header_valid_(false),
@ -67,7 +66,6 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
obsolete_(false),
gc_once_after_open_(false),
expiration_range_({0, 0}),
sequence_range_({kMaxSequenceNumber, 0}),
last_access_(-1),
last_fsync_(0),
header_valid_(false),
@ -116,12 +114,11 @@ std::string BlobFile::DumpState() const {
" file_size: %" PRIu64 " deleted_count: %" PRIu64
" deleted_size: %" PRIu64
" closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
") sequence_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d",
"), writer: %d reader: %d",
path_to_dir_.c_str(), file_number_, blob_count_.load(),
gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_,
closed_.load(), obsolete_.load(), expiration_range_.first,
expiration_range_.second, sequence_range_.first,
sequence_range_.second, (!!log_writer_), (!!ra_file_reader_));
expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
return str;
}
@ -144,8 +141,6 @@ Status BlobFile::WriteFooterAndCloseLocked() {
footer.expiration_range = expiration_range_;
}
footer.sequence_range = sequence_range_;
// this will close the file and reset the Writable File Pointer.
Status s = log_writer_->AppendFooter(footer);
if (s.ok()) {
@ -185,7 +180,6 @@ Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
last_fsync_.store(file_size_);
blob_count_ = footer.blob_count;
expiration_range_ = footer.expiration_range;
sequence_range_ = footer.sequence_range;
closed_ = true;
return Status::OK();
}

@ -84,8 +84,6 @@ class BlobFile {
ExpirationRange expiration_range_;
SequenceRange sequence_range_;
// Sequential/Append writer for blobs
std::shared_ptr<Writer> log_writer_;
@ -177,17 +175,6 @@ class BlobFile {
expiration_range_.second = std::max(expiration_range_.second, expiration);
}
SequenceRange GetSequenceRange() const { return sequence_range_; }
void SetSequenceRange(SequenceRange sequence_range) {
sequence_range_ = sequence_range;
}
void ExtendSequenceRange(SequenceNumber sequence) {
sequence_range_.first = std::min(sequence_range_.first, sequence);
sequence_range_.second = std::max(sequence_range_.second, sequence);
}
bool HasTTL() const { return has_ttl_; }
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }

@ -67,8 +67,6 @@ void BlobLogFooter::EncodeTo(std::string* dst) {
PutFixed64(dst, blob_count);
PutFixed64(dst, expiration_range.first);
PutFixed64(dst, expiration_range.second);
PutFixed64(dst, sequence_range.first);
PutFixed64(dst, sequence_range.second);
crc = crc32c::Value(dst->c_str(), dst->size());
crc = crc32c::Mask(crc);
PutFixed32(dst, crc);
@ -82,14 +80,12 @@ Status BlobLogFooter::DecodeFrom(Slice src) {
"Unexpected blob file footer size");
}
uint32_t src_crc = 0;
src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - 4);
src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - sizeof(uint32_t));
src_crc = crc32c::Mask(src_crc);
uint32_t magic_number;
if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) ||
!GetFixed64(&src, &expiration_range.first) ||
!GetFixed64(&src, &expiration_range.second) ||
!GetFixed64(&src, &sequence_range.first) ||
!GetFixed64(&src, &sequence_range.second) || !GetFixed32(&src, &crc)) {
!GetFixed64(&src, &expiration_range.second) || !GetFixed32(&src, &crc)) {
return Status::Corruption(kErrorMessage, "Error decoding content");
}
if (magic_number != kMagicNumber) {

@ -24,7 +24,6 @@ constexpr uint32_t kVersion1 = 1;
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
using ExpirationRange = std::pair<uint64_t, uint64_t>;
using SequenceRange = std::pair<uint64_t, uint64_t>;
// Format of blob log file header (30 bytes):
//
@ -53,24 +52,23 @@ struct BlobLogHeader {
Status DecodeFrom(Slice slice);
};
// Format of blob log file footer (48 bytes):
// Format of blob log file footer (32 bytes):
//
// +--------------+------------+-------------------+-------------------+------------+
// | magic number | blob count | expiration range | sequence range | footer CRC |
// +--------------+------------+-------------------+-------------------+------------+
// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed64 + Fixed64 | Fixed32 |
// +--------------+------------+-------------------+-------------------+------------+
// +--------------+------------+-------------------+------------+
// | magic number | blob count | expiration range | footer CRC |
// +--------------+------------+-------------------+------------+
// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed32 |
// +--------------+------------+-------------------+------------+
//
// The footer will be presented only when the blob file is properly closed.
//
// Unlike the same field in file header, expiration range in the footer is the
// range of smallest and largest expiration of the data in this file.
struct BlobLogFooter {
static constexpr size_t kSize = 48;
static constexpr size_t kSize = 32;
uint64_t blob_count = 0;
ExpirationRange expiration_range = std::make_pair(0, 0);
SequenceRange sequence_range = std::make_pair(0, 0);
uint32_t crc = 0;
void EncodeTo(std::string* dst);

Loading…
Cancel
Save