Blob DB: Inline small values in base DB

Summary:
Adding the `min_blob_size` option to allow storing small values in base db (in LSM tree) together with the key. The goal is to improve performance for small values, while taking advantage of blob db's low write amplification for large values.

Also adding expiration timestamp to blob index. It will be useful to evict stale blob indexes in base db by adding a compaction filter. I'll work on the compaction filter in future patches.

See blob_index.h for the new blob index format. There are 4 cases when writing a new key:
* small value w/o TTL: put in base db as normal value (i.e. ValueType::kTypeValue)
* small value w/ TTL: put (type, expiration, value) to base db.
* large value w/o TTL: write value to blob log and put (type, file, offset, size, compression) to base db.
* large value w/TTL: write value to blob log and put (type, expiration, file, offset, size, compression) to base db.
Closes https://github.com/facebook/rocksdb/pull/3066

Differential Revision: D6142115

Pulled By: yiwu-arbug

fbshipit-source-id: 9526e76e19f0839310a3f5f2a43772a4ad182cd0
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 9b18cc2363
commit 5a2a6483dc
  1. 2
      db/db_impl.cc
  2. 2
      include/rocksdb/utilities/debug.h
  3. 4
      utilities/blob_db/blob_db.h
  4. 555
      utilities/blob_db/blob_db_impl.cc
  5. 26
      utilities/blob_db/blob_db_impl.h
  6. 135
      utilities/blob_db/blob_db_test.cc
  7. 5
      utilities/blob_db/blob_file.cc
  8. 161
      utilities/blob_db/blob_index.h

@ -1641,7 +1641,7 @@ bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) {
if (snapshots_.empty()) {
return false;
}
return (snapshots_.newest()->GetSequenceNumber() > sn);
return (snapshots_.newest()->GetSequenceNumber() >= sn);
}
#ifndef ROCKSDB_LITE

@ -16,6 +16,8 @@ namespace rocksdb {
// store multiple versions of a same user key due to snapshots, compaction not
// happening yet, etc.
struct KeyVersion {
KeyVersion() : user_key(""), value(""), sequence(0), type(0) {}
KeyVersion(const std::string& _user_key, const std::string& _value,
SequenceNumber _sequence, int _type)
: user_key(_user_key), value(_value), sequence(_sequence), type(_type) {}

@ -52,6 +52,10 @@ struct BlobDBOptions {
// and so on
uint64_t ttl_range_secs = 3600;
// The smallest value to store in blob log. Value larger than this threshold
// will be inlined in base DB together with the key.
uint64_t min_blob_size = 0;
// at what bytes will the blob files be synced to blob log.
uint64_t bytes_per_sync = 0;

@ -33,6 +33,7 @@
#include "util/sync_point.h"
#include "util/timer_queue.h"
#include "utilities/blob_db/blob_db_iterator.h"
#include "utilities/blob_db/blob_index.h"
namespace {
int kBlockBasedTableVersionFormat = 2;
@ -49,78 +50,8 @@ void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) {
} // end namespace
namespace rocksdb {
namespace blob_db {
// BlobHandle is a pointer to the blob that is stored in the LSM
class BlobHandle {
public:
BlobHandle()
: file_number_(std::numeric_limits<uint64_t>::max()),
offset_(std::numeric_limits<uint64_t>::max()),
size_(std::numeric_limits<uint64_t>::max()),
compression_(kNoCompression) {}
uint64_t filenumber() const { return file_number_; }
void set_filenumber(uint64_t fn) { file_number_ = fn; }
// The offset of the block in the file.
uint64_t offset() const { return offset_; }
void set_offset(uint64_t _offset) { offset_ = _offset; }
// The size of the stored block
uint64_t size() const { return size_; }
void set_size(uint64_t _size) { size_ = _size; }
CompressionType compression() const { return compression_; }
void set_compression(CompressionType t) { compression_ = t; }
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& input);
void clear();
private:
uint64_t file_number_;
uint64_t offset_;
uint64_t size_;
CompressionType compression_;
};
void BlobHandle::EncodeTo(std::string* dst) const {
// Sanity check that all fields have been set
assert(offset_ != std::numeric_limits<uint64_t>::max());
assert(size_ != std::numeric_limits<uint64_t>::max());
assert(file_number_ != std::numeric_limits<uint64_t>::max());
dst->reserve(30);
PutVarint64(dst, file_number_);
PutVarint64(dst, offset_);
PutVarint64(dst, size_);
dst->push_back(static_cast<unsigned char>(compression_));
}
void BlobHandle::clear() {
file_number_ = std::numeric_limits<uint64_t>::max();
offset_ = std::numeric_limits<uint64_t>::max();
size_ = std::numeric_limits<uint64_t>::max();
compression_ = kNoCompression;
}
Status BlobHandle::DecodeFrom(const Slice& input) {
Slice s(input);
Slice* p = &s;
if (GetVarint64(p, &file_number_) && GetVarint64(p, &offset_) &&
GetVarint64(p, &size_)) {
compression_ = static_cast<CompressionType>(p->data()[0]);
return Status::OK();
} else {
clear();
return Status::Corruption("bad blob handle");
}
}
Random blob_rgen(static_cast<uint32_t>(time(nullptr)));
void BlobDBFlushBeginListener::OnFlushBegin(DB* db, const FlushJobInfo& info) {
@ -149,19 +80,20 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
if (!is_new &&
value_type ==
CompactionEventListener::CompactionListenerValueType::kValue) {
BlobHandle handle;
Status s = handle.DecodeFrom(existing_value);
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(existing_value);
if (s.ok()) {
if (impl_->debug_level_ >= 3)
ROCKS_LOG_INFO(impl_->db_options_.info_log,
"CALLBACK COMPACTED OUT KEY: %s SN: %d "
"NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64
" SIZE: %" PRIu64,
key.ToString().c_str(), sn, is_new, handle.filenumber(),
handle.offset(), handle.size());
impl_->override_vals_q_.enqueue({handle.filenumber(), key.size(),
handle.offset(), handle.size(), sn});
ROCKS_LOG_INFO(
impl_->db_options_.info_log,
"CALLBACK COMPACTED OUT KEY: %s SN: %d "
"NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64 " SIZE: %" PRIu64,
key.ToString().c_str(), sn, is_new, blob_index.file_number(),
blob_index.offset(), blob_index.size());
impl_->override_vals_q_.enqueue({blob_index.file_number(), key.size(),
blob_index.offset(), blob_index.size(),
sn});
}
} else {
if (impl_->debug_level_ >= 3)
@ -178,7 +110,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
db_impl_(nullptr),
env_(db_options.env),
ttl_extractor_(blob_db_options.ttl_extractor.get()),
wo_set_(false),
bdb_options_(blob_db_options),
db_options_(db_options),
env_options_(db_options),
@ -235,7 +166,6 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
: BlobDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
wo_set_(false),
bdb_options_(blob_db_options),
db_options_(db->GetOptions()),
env_options_(db_->GetOptions()),
@ -610,17 +540,6 @@ std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
return writer;
}
void BlobDBImpl::UpdateWriteOptions(const WriteOptions& options) {
if (!wo_set_.load(std::memory_order_relaxed)) {
// DCLP
WriteLock wl(&mutex_);
if (!wo_set_.load(std::memory_order_acquire)) {
wo_set_.store(true, std::memory_order_release);
write_options_ = options;
}
}
}
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
uint32_t val = blob_rgen.Next();
{
@ -736,14 +655,6 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
return bfile;
}
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
std::string new_value;
Slice value_slice;
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
return PutUntil(options, key, value_slice, expiration);
}
Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
Status s = db_->Delete(options, key);
@ -753,141 +664,94 @@ Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
return s;
}
Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
class BlobInserter : public WriteBatch::Handler {
private:
BlobDBImpl* impl_;
SequenceNumber sequence_;
WriteBatch updates_blob_;
std::shared_ptr<BlobFile> last_file_;
bool has_put_;
std::string new_value_;
uint32_t default_cf_id_;
public:
BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
: impl_(impl),
sequence_(seq),
has_put_(false),
default_cf_id_(reinterpret_cast<ColumnFamilyHandleImpl*>(
impl_->DefaultColumnFamily())
->cfd()
->GetID()) {}
SequenceNumber sequence() { return sequence_; }
WriteBatch* updates_blob() { return &updates_blob_; }
std::shared_ptr<BlobFile>& last_file() { return last_file_; }
bool has_put() { return has_put_; }
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value_slice) override {
if (column_family_id != default_cf_id_) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
Slice value_unc;
uint64_t expiration =
impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
std::shared_ptr<BlobFile> bfile =
(expiration != kNoExpiration)
? impl_->SelectBlobFileTTL(expiration)
: ((last_file_) ? last_file_ : impl_->SelectBlobFile());
if (last_file_ && last_file_ != bfile) {
return Status::NotFound("too many blob files");
}
if (!bfile) {
return Status::NotFound("blob file not found");
}
last_file_ = bfile;
has_put_ = true;
std::string compression_output;
Slice value = impl_->GetCompressedSlice(value_unc, &compression_output);
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
std::string index_entry;
Status s = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (!s.ok()) {
return s;
}
bfile->ExtendSequenceRange(sequence_);
sequence_++;
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
}
class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
private:
const WriteOptions& options_;
BlobDBImpl* blob_db_impl_;
uint32_t default_cf_id_;
SequenceNumber sequence_;
WriteBatch batch_;
return WriteBatchInternal::PutBlobIndex(&updates_blob_, column_family_id,
key, index_entry);
public:
BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
uint32_t default_cf_id, SequenceNumber seq)
: options_(options),
blob_db_impl_(blob_db_impl),
default_cf_id_(default_cf_id),
sequence_(seq) {}
SequenceNumber sequence() { return sequence_; }
WriteBatch* batch() { return &batch_; }
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (column_family_id != default_cf_id_) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
std::string new_value;
Slice value_slice;
uint64_t expiration =
blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value);
Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice,
expiration, sequence_, &batch_);
sequence_++;
return s;
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (column_family_id != default_cf_id_) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
sequence_++;
return Status::OK();
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (column_family_id != default_cf_id_) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
sequence_++;
return s;
}
virtual Status DeleteRange(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) {
if (column_family_id != default_cf_id_) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id,
begin_key, end_key);
sequence_++;
return Status::OK();
virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
const Slice& end_key) {
if (column_family_id != default_cf_id_) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
}
Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
begin_key, end_key);
sequence_++;
return s;
}
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in blob db.");
}
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in blob db.");
}
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in blob db.");
}
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in blob db.");
}
virtual void LogData(const Slice& blob) override {
updates_blob_.PutLogData(blob);
}
};
virtual void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
};
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
MutexLock l(&write_mutex_);
SequenceNumber current_seq = db_impl_->GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(this, current_seq);
uint32_t default_cf_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
Status s = updates->Iterate(&blob_inserter);
if (!s.ok()) {
return s;
}
s = db_->Write(opts, blob_inserter.updates_blob());
s = db_->Write(options, blob_inserter.batch());
if (!s.ok()) {
return s;
}
assert(current_seq ==
WriteBatchInternal::Sequence(blob_inserter.updates_blob()));
assert(blob_inserter.sequence() ==
current_seq + WriteBatchInternal::Count(blob_inserter.updates_blob()));
if (blob_inserter.has_put()) {
s = CloseBlobFileIfNeeded(blob_inserter.last_file());
if (!s.ok()) {
return s;
}
}
assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);
// add deleted key to list of keys that have been deleted for book-keeping
class DeleteBookkeeper : public WriteBatch::Handler {
@ -956,83 +820,106 @@ void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
}
}
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
std::string new_value;
Slice value_slice;
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
return PutUntil(options, key, value_slice, expiration);
}
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
const Slice& key, const Slice& value,
uint64_t ttl) {
uint64_t now = EpochNow();
assert(std::numeric_limits<uint64_t>::max() - now > ttl);
return PutUntil(options, key, value, now + ttl);
}
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
std::string* compression_output) const {
if (bdb_options_.compression == kNoCompression) {
return raw;
}
CompressionType ct = bdb_options_.compression;
CompressionOptions compression_opts;
CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat,
Slice(), compression_output);
return *compression_output;
uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
return PutUntil(options, key, value, expiration);
}
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value_unc, uint64_t expiration) {
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
const Slice& value, uint64_t expiration) {
MutexLock l(&write_mutex_);
UpdateWriteOptions(options);
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
? SelectBlobFileTTL(expiration)
: SelectBlobFile();
if (!bfile) return Status::NotFound("Blob file not found");
std::string compression_output;
Slice value = GetCompressedSlice(value_unc, &compression_output);
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
std::string index_entry;
Status s = AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
" status: '%s' blob_file: '%s'",
bfile->PathName().c_str(), key.ToString().c_str(),
value.size(), s.ToString().c_str(),
bfile->DumpState().c_str());
return s;
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
WriteBatch batch;
Status s = PutBlobValue(options, key, value, expiration, sequence, &batch);
if (s.ok()) {
s = db_->Write(options, &batch);
}
return s;
}
WriteBatch batch;
Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration,
SequenceNumber sequence, WriteBatch* batch) {
TEST_SYNC_POINT("BlobDBImpl::PutBlobValue:Start");
Status s;
std::string index_entry;
uint32_t column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
s = WriteBatchInternal::PutBlobIndex(&batch, column_family_id, key,
index_entry);
if (value.size() < bdb_options_.min_blob_size) {
if (expiration == kNoExpiration) {
// Put as normal value
s = batch->Put(key, value);
} else {
// Inlined with TTL
BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
index_entry);
}
} else {
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
? SelectBlobFileTTL(expiration)
: SelectBlobFile();
if (!bfile) {
return Status::NotFound("Blob file not found");
}
// this goes to the base db and can be expensive
if (s.ok()) {
s = db_->Write(options, &batch);
}
std::string compression_output;
Slice value_compressed = GetCompressedSlice(value, &compression_output);
if (s.ok()) {
// this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&batch);
bfile->ExtendSequenceRange(sn);
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration,
-1);
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
}
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
&index_entry);
s = CloseBlobFileIfNeeded(bfile);
if (s.ok()) {
bfile->ExtendSequenceRange(sequence);
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
}
s = CloseBlobFileIfNeeded(bfile);
if (s.ok()) {
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
index_entry);
}
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
" status: '%s' blob_file: '%s'",
bfile->PathName().c_str(), key.ToString().c_str(),
value.size(), s.ToString().c_str(),
bfile->DumpState().c_str());
}
}
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
TEST_SYNC_POINT("BlobDBImpl::PutBlobValue:Finish");
return s;
}
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
std::string* compression_output) const {
if (bdb_options_.compression == kNoCompression) {
return raw;
}
CompressionType ct = bdb_options_.compression;
CompressionOptions compression_opts;
CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat,
Slice(), compression_output);
return *compression_output;
}
uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice,
std::string* new_value) {
@ -1049,7 +936,8 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry) {
const Slice& value, uint64_t expiration,
std::string* index_entry) {
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
if (bdb_options_.blob_dir_size > 0 &&
(total_blob_space_.load() + size_put) > bdb_options_.blob_dir_size) {
@ -1086,18 +974,14 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
last_period_write_ += size_put;
total_blob_space_ += size_put;
BlobHandle handle;
handle.set_filenumber(bfile->BlobFileNumber());
handle.set_size(value.size());
handle.set_offset(blob_offset);
handle.set_compression(bdb_options_.compression);
handle.EncodeTo(index_entry);
if (debug_level_ >= 3)
ROCKS_LOG_INFO(db_options_.info_log,
">Adding KEY FILE: %s: BC: %d OFFSET: %d SZ: %d",
bfile->PathName().c_str(), bfile->blob_count_.load(),
blob_offset, value.size());
if (expiration == kNoExpiration) {
BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
value.size(), bdb_options_.compression);
} else {
BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
blob_offset, value.size(),
bdb_options_.compression);
}
return s;
}
@ -1138,29 +1022,45 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value) {
assert(value != nullptr);
BlobHandle handle;
Status s = handle.DecodeFrom(index_entry);
if (!s.ok()) return s;
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) {
return s;
}
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
return Status::NotFound("Key expired");
}
if (blob_index.IsInlined()) {
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
// memory buffer to avoid extra copy.
value->PinSelf(blob_index.value());
return Status::OK();
}
if (blob_index.size() == 0) {
value->PinSelf("");
return Status::OK();
}
// offset has to have certain min, as we will read CRC
// later from the Blob Header, which needs to be also a
// valid offset.
if (handle.offset() <
if (blob_index.offset() <
(BlobLogHeader::kHeaderSize + BlobLogRecord::kHeaderSize + key.size())) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Invalid blob handle file_number: %" PRIu64 " blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s",
handle.filenumber(), handle.offset(), handle.size(), key.data());
ROCKS_LOG_ERROR(db_options_.info_log,
"Invalid blob index file_number: %" PRIu64
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
" key: %s",
blob_index.file_number(), blob_index.offset(),
blob_index.size(), key.data());
}
return Status::NotFound("Blob Not Found, although found in LSM");
return Status::NotFound("Invalid blob offset");
}
std::shared_ptr<BlobFile> bfile;
{
ReadLock rl(&mutex_);
auto hitr = blob_files_.find(handle.filenumber());
auto hitr = blob_files_.find(blob_index.file_number());
// file was deleted
if (hitr == blob_files_.end()) {
@ -1170,7 +1070,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
bfile = hitr->second;
}
if (handle.size() == 0 && value != nullptr) {
if (blob_index.size() == 0 && value != nullptr) {
value->PinSelf("");
return Status::OK();
}
@ -1186,19 +1086,19 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
}
// allocate the buffer. This is safe in C++11
valueptr->resize(handle.size());
valueptr->resize(blob_index.size());
char* buffer = &(*valueptr)[0];
Slice blob_value;
s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
if (!s.ok() || blob_value.size() != handle.size()) {
s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value, buffer);
if (!s.ok() || blob_value.size() != blob_index.size()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to read blob from file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
static_cast<int>(blob_value.size()), key.data(),
s.ToString().c_str());
bfile->PathName().c_str(), blob_index.offset(),
blob_index.size(), static_cast<int>(blob_value.size()),
key.data(), s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
}
@ -1208,15 +1108,15 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
std::string crc_str;
crc_str.resize(sizeof(uint32_t));
char* crc_buffer = &(crc_str[0]);
s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)),
s = reader->Read(blob_index.offset() - (key.size() + sizeof(uint32_t)),
sizeof(uint32_t), &crc_slice, crc_buffer);
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
bfile->PathName().c_str(), blob_index.offset(),
blob_index.size(), key.data(), s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
}
@ -1228,8 +1128,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob crc mismatch file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
bfile->PathName().c_str(), blob_index.offset(),
blob_index.size(), key.data(), s.ToString().c_str());
}
return Status::Corruption("Corruption. Blob CRC mismatch");
}
@ -1358,8 +1258,9 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
SequenceNumber esn = bfile->GetSNRange().first;
// this is not correct.
// you want to check that there are no snapshots in the
// TODO(yiwu): Here we should check instead if there is an active snapshot
// lies between the first sequence in the file, and the last sequence by
// the time the file finished being garbage collect.
bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn);
if (notok) {
ROCKS_LOG_INFO(db_options_.info_log,
@ -1399,16 +1300,16 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
}
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
BlobHandle handle;
Status s = handle.DecodeFrom(index_entry);
BlobIndex blob_index;
Status s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Could not parse lsm val in MarkBlobDeleted %s",
index_entry.ToString().c_str());
return false;
}
bool succ = FindFileAndEvictABlob(handle.filenumber(), key.size(),
handle.offset(), handle.size());
bool succ = FindFileAndEvictABlob(blob_index.file_number(), key.size(),
blob_index.offset(), blob_index.size());
return succ;
}
@ -1757,16 +1658,16 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
continue;
}
BlobHandle handle;
s = handle.DecodeFrom(index_entry);
BlobIndex blob_index;
s = blob_index.DecodeFrom(index_entry);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while decoding index entry: %s",
s.ToString().c_str());
break;
}
if (handle.filenumber() != bfptr->BlobFileNumber() ||
handle.offset() != blob_offset) {
if (blob_index.file_number() != bfptr->BlobFileNumber() ||
blob_index.offset() != blob_offset) {
// Key has been overwritten. Drop the blob record.
continue;
}
@ -1843,12 +1744,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset,
&new_blob_offset, record.GetTTL());
BlobHandle new_handle;
new_handle.set_filenumber(newfile->BlobFileNumber());
new_handle.set_size(record.Blob().size());
new_handle.set_offset(new_blob_offset);
new_handle.set_compression(bdb_options_.compression);
new_handle.EncodeTo(&new_index_entry);
BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
new_blob_offset, record.Blob().size(),
bdb_options_.compression);
newfile->blob_count_++;
newfile->file_size_ +=
@ -2269,6 +2167,11 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
}
#ifndef NDEBUG
Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value) {
return GetBlobValue(key, index_entry, value);
}
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> blob_files;

@ -15,6 +15,7 @@
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
@ -215,9 +216,6 @@ class BlobDBImpl : public BlobDB {
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
using BlobDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
@ -249,7 +247,7 @@ class BlobDBImpl : public BlobDB {
using BlobDB::PutUntil;
Status PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value_unc, uint64_t expiration) override;
const Slice& value, uint64_t expiration) override;
Status LinkToBaseDB(DB* db) override;
@ -263,6 +261,9 @@ class BlobDBImpl : public BlobDB {
~BlobDBImpl();
#ifndef NDEBUG
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
@ -281,6 +282,7 @@ class BlobDBImpl : public BlobDB {
private:
class GarbageCollectionWriteCallback;
class BlobInserter;
Status OpenPhase1();
@ -288,6 +290,9 @@ class BlobDBImpl : public BlobDB {
// Return true if a snapshot is created.
bool SetSnapshotIfNeeded(ReadOptions* read_options);
Status GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;
@ -314,9 +319,14 @@ class BlobDBImpl : public BlobDB {
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice, std::string* new_value);
Status PutBlobValue(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration,
SequenceNumber sequence, WriteBatch* batch);
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry);
const Slice& value, uint64_t expiration,
std::string* index_entry);
// find an existing blob log file based on the expiration unix epoch
// if such a file does not exist, return nullptr
@ -327,8 +337,6 @@ class BlobDBImpl : public BlobDB {
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
void UpdateWriteOptions(const WriteOptions& options);
void Shutdown();
// periodic sanity check. Bunch of checks
@ -426,10 +434,6 @@ class BlobDBImpl : public BlobDB {
Env* env_;
TTLExtractor* ttl_extractor_;
// a boolean to capture whether write_options has been set
std::atomic<bool> wo_set_;
WriteOptions write_options_;
// the options that govern the behavior of Blob Storage
BlobDBOptions bdb_options_;
DBOptions db_options_;

@ -5,19 +5,24 @@
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
#include <algorithm>
#include <cstdlib>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "db/db_test_util.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "util/cast_util.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "utilities/blob_db/blob_db.h"
#include "utilities/blob_db/blob_db_impl.h"
#include "utilities/blob_db/blob_index.h"
namespace rocksdb {
namespace blob_db {
@ -26,6 +31,12 @@ class BlobDBTest : public testing::Test {
public:
const int kMaxBlobSize = 1 << 14;
struct BlobRecord {
std::string key;
std::string value;
uint64_t expiration = 0;
};
BlobDBTest()
: dbname_(test::TmpDir() + "/blob_db_test"),
mock_env_(new MockTimeEnv(Env::Default())),
@ -127,6 +138,32 @@ class BlobDBTest : public testing::Test {
delete iter;
}
void VerifyBaseDB(
const std::map<std::string, KeyVersion> &expected_versions) {
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
DB *db = blob_db_->GetRootDB();
std::vector<KeyVersion> versions;
GetAllKeyVersions(db, "", "", &versions);
ASSERT_EQ(expected_versions.size(), versions.size());
size_t i = 0;
for (auto &key_version : expected_versions) {
const KeyVersion &expected_version = key_version.second;
ASSERT_EQ(expected_version.user_key, versions[i].user_key);
ASSERT_EQ(expected_version.sequence, versions[i].sequence);
ASSERT_EQ(expected_version.type, versions[i].type);
if (versions[i].type == kTypeValue) {
ASSERT_EQ(expected_version.value, versions[i].value);
} else {
ASSERT_EQ(kTypeBlobIndex, versions[i].type);
PinnableSlice value;
ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
versions[i].value, &value));
ASSERT_EQ(expected_version.value, value.ToString());
}
i++;
}
}
void InsertBlobs() {
WriteOptions wo;
std::string value;
@ -151,6 +188,7 @@ class BlobDBTest : public testing::Test {
TEST_F(BlobDBTest, Put) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
@ -166,6 +204,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
@ -195,6 +234,7 @@ TEST_F(BlobDBTest, PutUntil) {
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
@ -226,6 +266,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.num_concurrent_simple_blobs = 1;
bdb_options.ttl_extractor = ttl_extractor_;
@ -275,6 +316,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
@ -322,6 +364,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
@ -369,6 +412,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
bdb_options.disable_background_tasks = true;
@ -403,6 +447,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
TEST_F(BlobDBTest, StackableDBGet) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
@ -425,6 +470,7 @@ TEST_F(BlobDBTest, StackableDBGet) {
TEST_F(BlobDBTest, WriteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
@ -441,6 +487,7 @@ TEST_F(BlobDBTest, WriteBatch) {
TEST_F(BlobDBTest, Delete) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
@ -456,6 +503,7 @@ TEST_F(BlobDBTest, Delete) {
TEST_F(BlobDBTest, DeleteBatch) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
for (size_t i = 0; i < 100; i++) {
@ -473,6 +521,7 @@ TEST_F(BlobDBTest, DeleteBatch) {
TEST_F(BlobDBTest, Override) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
@ -490,6 +539,7 @@ TEST_F(BlobDBTest, Override) {
TEST_F(BlobDBTest, Compression) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
bdb_options.compression = CompressionType::kSnappyCompression;
Open(bdb_options);
@ -541,6 +591,7 @@ TEST_F(BlobDBTest, MultipleWriters) {
TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
BlobDBImpl *blob_db_impl =
@ -580,6 +631,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
@ -591,8 +643,8 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::PutBlobValue:Start"},
{"BlobDBImpl::PutBlobValue:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
SyncPoint::GetInstance()->EnableProcessing();
@ -615,6 +667,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_current_time(100);
@ -628,8 +681,8 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::PutBlobValue:Start"},
{"BlobDBImpl::PutBlobValue:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
SyncPoint::GetInstance()->EnableProcessing();
@ -656,6 +709,7 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
bdb_options.is_fifo = true;
bdb_options.blob_dir_size = 100;
bdb_options.blob_file_size = 100;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::string value(100, 'v');
@ -687,6 +741,7 @@ TEST_F(BlobDBTest, ReadWhileGC) {
// run the same test for Get(), MultiGet() and Iterator each.
for (int i = 0; i < 2; i++) {
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
blob_db_->Put(WriteOptions(), "foo", "bar");
@ -798,6 +853,7 @@ TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
TEST_F(BlobDBTest, GetLiveFilesMetaData) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.min_blob_size = 0;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
std::map<std::string, std::string> data;
@ -894,6 +950,75 @@ TEST_F(BlobDBTest, OutOfSpace) {
ASSERT_TRUE(s.IsNoSpace());
}
TEST_F(BlobDBTest, InlineSmallValues) {
constexpr uint64_t kMaxExpiration = 1000;
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = kMaxExpiration;
bdb_options.min_blob_size = 100;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Options options;
options.env = mock_env_.get();
mock_env_->set_current_time(0);
Open(bdb_options, options);
std::map<std::string, std::string> data;
std::map<std::string, KeyVersion> versions;
SequenceNumber first_non_ttl_seq = kMaxSequenceNumber;
SequenceNumber first_ttl_seq = kMaxSequenceNumber;
SequenceNumber last_non_ttl_seq = 0;
SequenceNumber last_ttl_seq = 0;
for (size_t i = 0; i < 1000; i++) {
bool is_small_value = rnd.Next() % 2;
bool has_ttl = rnd.Next() % 2;
uint64_t expiration = rnd.Next() % kMaxExpiration;
int len = is_small_value ? 50 : 200;
std::string key = "key" + ToString(i);
std::string value = test::RandomHumanReadableString(&rnd, len);
std::string blob_index;
data[key] = value;
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
if (!has_ttl) {
ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
} else {
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
}
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
versions[key] =
KeyVersion(key, value, sequence,
(is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
if (!is_small_value) {
if (!has_ttl) {
first_non_ttl_seq = std::min(first_non_ttl_seq, sequence);
last_non_ttl_seq = std::max(last_non_ttl_seq, sequence);
} else {
first_ttl_seq = std::min(first_ttl_seq, sequence);
last_ttl_seq = std::max(last_ttl_seq, sequence);
}
}
}
VerifyDB(data);
VerifyBaseDB(versions);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(2, blob_files.size());
std::shared_ptr<BlobFile> non_ttl_file;
std::shared_ptr<BlobFile> ttl_file;
if (blob_files[0]->HasTTL()) {
ttl_file = blob_files[0];
non_ttl_file = blob_files[1];
} else {
non_ttl_file = blob_files[0];
ttl_file = blob_files[1];
}
ASSERT_FALSE(non_ttl_file->HasTTL());
ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSNRange().first);
ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSNRange().second);
ASSERT_TRUE(ttl_file->HasTTL());
ASSERT_EQ(first_ttl_seq, ttl_file->GetSNRange().first);
ASSERT_EQ(last_ttl_seq, ttl_file->GetSNRange().second);
}
} // namespace blob_db
} // namespace rocksdb

@ -15,6 +15,7 @@
#include <algorithm>
#include <memory>
#include "db/dbformat.h"
#include "util/filename.h"
#include "util/logging.h"
#include "utilities/blob_db/blob_db_impl.h"
@ -36,7 +37,7 @@ BlobFile::BlobFile()
gc_once_after_open_(false),
ttl_range_(std::make_pair(0, 0)),
time_range_(std::make_pair(0, 0)),
sn_range_(std::make_pair(0, 0)),
sn_range_(std::make_pair(kMaxSequenceNumber, 0)),
last_access_(-1),
last_fsync_(0),
header_valid_(false) {}
@ -55,7 +56,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
gc_once_after_open_(false),
ttl_range_(std::make_pair(0, 0)),
time_range_(std::make_pair(0, 0)),
sn_range_(std::make_pair(0, 0)),
sn_range_(std::make_pair(kMaxSequenceNumber, 0)),
last_access_(-1),
last_fsync_(0),
header_valid_(false) {}

@ -0,0 +1,161 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/options.h"
#include "util/coding.h"
#include "util/string_util.h"
namespace rocksdb {
namespace blob_db {
// BlobIndex is a pointer to the blob and metadata of the blob. The index is
// stored in base DB as ValueType::kTypeBlobIndex.
// There are three types of blob index:
//
// kInlinedTTL:
// +------+------------+---------------+
// | type | expiration | value |
// +------+------------+---------------+
// | char | varint64 | variable size |
// +------+------------+---------------+
//
// kBlob:
// +------+-------------+----------+----------+-------------+
// | type | file number | offset | size | compression |
// +------+-------------+----------+----------+-------------+
// | char | varint64 | varint64 | varint64 | char |
// +------+-------------+----------+----------+-------------+
//
// kBlobTTL:
// +------+------------+-------------+----------+----------+-------------+
// | type | expiration | file number | offset | size | compression |
// +------+------------+-------------+----------+----------+-------------+
// | char | varint64 | varint64 | varint64 | varint64 | char |
// +------+------------+-------------+----------+----------+-------------+
//
// There isn't a kInlined (without TTL) type since we can store it as a plain
// value (i.e. ValueType::kTypeValue).
class BlobIndex {
public:
enum class Type : unsigned char {
kInlinedTTL = 0,
kBlob = 1,
kBlobTTL = 2,
kUnknown = 3,
};
BlobIndex() : type_(Type::kUnknown) {}
bool IsInlined() const { return type_ == Type::kInlinedTTL; }
bool HasTTL() const {
return type_ == Type::kInlinedTTL || type_ == Type::kBlobTTL;
}
uint64_t expiration() const {
assert(HasTTL());
return expiration_;
}
const Slice& value() const {
assert(IsInlined());
return value_;
}
uint64_t file_number() const {
assert(!IsInlined());
return file_number_;
}
uint64_t offset() const {
assert(!IsInlined());
return offset_;
}
uint64_t size() const {
assert(!IsInlined());
return size_;
}
Status DecodeFrom(Slice slice) {
static const std::string kErrorMessage = "Error while decoding blob index";
assert(slice.size() > 0);
type_ = static_cast<Type>(*slice.data());
if (type_ >= Type::kUnknown) {
return Status::Corruption(
kErrorMessage,
"Unknown blob index type: " + ToString(static_cast<char>(type_)));
}
slice = Slice(slice.data() + 1, slice.size() - 1);
if (HasTTL()) {
if (!GetVarint64(&slice, &expiration_)) {
return Status::Corruption(kErrorMessage, "Corrupted expiration");
}
}
if (IsInlined()) {
value_ = slice;
} else {
if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) &&
GetVarint64(&slice, &size_) && slice.size() == 1) {
compression_ = static_cast<CompressionType>(*slice.data());
} else {
return Status::Corruption(kErrorMessage, "Corrupted blob offset");
}
}
return Status::OK();
}
static void EncodeInlinedTTL(std::string* dst, uint64_t expiration,
const Slice& value) {
assert(dst != nullptr);
dst->clear();
dst->reserve(1 + kMaxVarint64Length + value.size());
dst->push_back(static_cast<char>(Type::kInlinedTTL));
PutVarint64(dst, expiration);
dst->append(value.data(), value.size());
}
static void EncodeBlob(std::string* dst, uint64_t file_number,
uint64_t offset, uint64_t size,
CompressionType compression) {
assert(dst != nullptr);
dst->clear();
dst->reserve(kMaxVarint64Length * 3 + 2);
dst->push_back(static_cast<char>(Type::kBlob));
PutVarint64(dst, file_number);
PutVarint64(dst, offset);
PutVarint64(dst, size);
dst->push_back(static_cast<char>(compression));
}
static void EncodeBlobTTL(std::string* dst, uint64_t expiration,
uint64_t file_number, uint64_t offset,
uint64_t size, CompressionType compression) {
assert(dst != nullptr);
dst->clear();
dst->reserve(kMaxVarint64Length * 4 + 2);
dst->push_back(static_cast<char>(Type::kBlobTTL));
PutVarint64(dst, expiration);
PutVarint64(dst, file_number);
PutVarint64(dst, offset);
PutVarint64(dst, size);
dst->push_back(static_cast<char>(compression));
}
private:
Type type_ = Type::kUnknown;
uint64_t expiration_ = 0;
Slice value_;
uint64_t file_number_ = 0;
uint64_t offset_ = 0;
uint64_t size_ = 0;
CompressionType compression_ = kNoCompression;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE
Loading…
Cancel
Save