|
|
|
@ -749,32 +749,20 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { |
|
|
|
|
return bfile; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status BlobDBImpl::Put(const WriteOptions& options, |
|
|
|
|
ColumnFamilyHandle* column_family, const Slice& key, |
|
|
|
|
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, |
|
|
|
|
const Slice& value) { |
|
|
|
|
std::string new_value; |
|
|
|
|
Slice value_slice; |
|
|
|
|
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value); |
|
|
|
|
return PutUntil(options, column_family, key, value_slice, expiration); |
|
|
|
|
return PutUntil(options, key, value_slice, expiration); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status BlobDBImpl::Delete(const WriteOptions& options, |
|
|
|
|
ColumnFamilyHandle* column_family, const Slice& key) { |
|
|
|
|
Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) { |
|
|
|
|
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); |
|
|
|
|
Status s = db_->Delete(options, column_family, key); |
|
|
|
|
Status s = db_->Delete(options, key); |
|
|
|
|
|
|
|
|
|
// add deleted key to list of keys that have been deleted for book-keeping
|
|
|
|
|
delete_keys_q_.enqueue({column_family, key.ToString(), lsn}); |
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status BlobDBImpl::SingleDelete(const WriteOptions& wopts, |
|
|
|
|
ColumnFamilyHandle* column_family, |
|
|
|
|
const Slice& key) { |
|
|
|
|
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); |
|
|
|
|
Status s = db_->SingleDelete(wopts, column_family, key); |
|
|
|
|
|
|
|
|
|
delete_keys_q_.enqueue({column_family, key.ToString(), lsn}); |
|
|
|
|
delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn}); |
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -788,10 +776,17 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { |
|
|
|
|
std::shared_ptr<BlobFile> last_file_; |
|
|
|
|
bool has_put_; |
|
|
|
|
std::string new_value_; |
|
|
|
|
uint32_t default_cf_id_; |
|
|
|
|
|
|
|
|
|
public: |
|
|
|
|
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) |
|
|
|
|
: impl_(impl), sequence_(seq), has_put_(false) {} |
|
|
|
|
: impl_(impl), |
|
|
|
|
sequence_(seq), |
|
|
|
|
has_put_(false), |
|
|
|
|
default_cf_id_(reinterpret_cast<ColumnFamilyHandleImpl*>( |
|
|
|
|
impl_->DefaultColumnFamily()) |
|
|
|
|
->cfd() |
|
|
|
|
->GetID()) {} |
|
|
|
|
|
|
|
|
|
WriteBatch& updates_blob() { return updates_blob_; } |
|
|
|
|
|
|
|
|
@ -803,6 +798,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { |
|
|
|
|
|
|
|
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key, |
|
|
|
|
const Slice& value_slice) override { |
|
|
|
|
if (column_family_id != default_cf_id_) { |
|
|
|
|
batch_rewrite_status_ = Status::NotSupported( |
|
|
|
|
"Blob DB doesn't support non-default column family."); |
|
|
|
|
return batch_rewrite_status_; |
|
|
|
|
} |
|
|
|
|
Slice value_unc; |
|
|
|
|
uint64_t expiration = |
|
|
|
|
impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_); |
|
|
|
@ -851,11 +851,28 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { |
|
|
|
|
|
|
|
|
|
virtual Status DeleteCF(uint32_t column_family_id, |
|
|
|
|
const Slice& key) override { |
|
|
|
|
if (column_family_id != default_cf_id_) { |
|
|
|
|
batch_rewrite_status_ = Status::NotSupported( |
|
|
|
|
"Blob DB doesn't support non-default column family."); |
|
|
|
|
return batch_rewrite_status_; |
|
|
|
|
} |
|
|
|
|
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key); |
|
|
|
|
sequence_++; |
|
|
|
|
return Status::OK(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
virtual Status DeleteRange(uint32_t column_family_id, |
|
|
|
|
const Slice& begin_key, const Slice& end_key) { |
|
|
|
|
if (column_family_id != default_cf_id_) { |
|
|
|
|
batch_rewrite_status_ = Status::NotSupported( |
|
|
|
|
"Blob DB doesn't support non-default column family."); |
|
|
|
|
return batch_rewrite_status_; |
|
|
|
|
} |
|
|
|
|
WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id, |
|
|
|
|
begin_key, end_key); |
|
|
|
|
return Status::OK(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, |
|
|
|
|
const Slice& /*key*/) override { |
|
|
|
|
batch_rewrite_status_ = |
|
|
|
@ -932,12 +949,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status BlobDBImpl::PutWithTTL(const WriteOptions& options, |
|
|
|
|
ColumnFamilyHandle* column_family, |
|
|
|
|
const Slice& key, const Slice& value, |
|
|
|
|
uint64_t ttl) { |
|
|
|
|
uint64_t now = EpochNow(); |
|
|
|
|
assert(std::numeric_limits<uint64_t>::max() - now > ttl); |
|
|
|
|
return PutUntil(options, column_family, key, value, now + ttl); |
|
|
|
|
return PutUntil(options, key, value, now + ttl); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, |
|
|
|
@ -952,8 +968,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, |
|
|
|
|
return *compression_output; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status BlobDBImpl::PutUntil(const WriteOptions& options, |
|
|
|
|
ColumnFamilyHandle* column_family, const Slice& key, |
|
|
|
|
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, |
|
|
|
|
const Slice& value_unc, uint64_t expiration) { |
|
|
|
|
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); |
|
|
|
|
MutexLock l(&write_mutex_); |
|
|
|
@ -992,14 +1007,11 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, |
|
|
|
|
bfile->PathName().c_str(), key.ToString().c_str(), |
|
|
|
|
value.size(), s.ToString().c_str(), |
|
|
|
|
bfile->DumpState().c_str()); |
|
|
|
|
// Fallback just write to the LSM and get going
|
|
|
|
|
WriteBatch batch; |
|
|
|
|
batch.Put(column_family, key, value); |
|
|
|
|
return db_->Write(options, &batch); |
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
WriteBatch batch; |
|
|
|
|
batch.Put(column_family, key, index_entry); |
|
|
|
|
batch.Put(key, index_entry); |
|
|
|
|
|
|
|
|
|
// this goes to the base db and can be expensive
|
|
|
|
|
s = db_->Write(options, &batch); |
|
|
|
@ -1123,7 +1135,6 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile, |
|
|
|
|
|
|
|
|
|
std::vector<Status> BlobDBImpl::MultiGet( |
|
|
|
|
const ReadOptions& read_options, |
|
|
|
|
const std::vector<ColumnFamilyHandle*>& column_family, |
|
|
|
|
const std::vector<Slice>& keys, std::vector<std::string>* values) { |
|
|
|
|
// Get a snapshot to avoid blob file get deleted between we
|
|
|
|
|
// fetch and index entry and reading from the file.
|
|
|
|
@ -1131,21 +1142,18 @@ std::vector<Status> BlobDBImpl::MultiGet( |
|
|
|
|
bool snapshot_created = SetSnapshotIfNeeded(&ro); |
|
|
|
|
std::vector<std::string> values_lsm; |
|
|
|
|
values_lsm.resize(keys.size()); |
|
|
|
|
auto statuses = db_->MultiGet(ro, column_family, keys, &values_lsm); |
|
|
|
|
auto statuses = db_->MultiGet(ro, keys, &values_lsm); |
|
|
|
|
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1"); |
|
|
|
|
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2"); |
|
|
|
|
|
|
|
|
|
values->resize(keys.size()); |
|
|
|
|
assert(statuses.size() == keys.size()); |
|
|
|
|
assert(values_lsm.size() == keys.size()); |
|
|
|
|
for (size_t i = 0; i < keys.size(); ++i) { |
|
|
|
|
if (!statuses[i].ok()) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]); |
|
|
|
|
auto cfd = cfh->cfd(); |
|
|
|
|
|
|
|
|
|
Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i])); |
|
|
|
|
Status s = CommonGet(keys[i], values_lsm[i], &((*values)[i])); |
|
|
|
|
statuses[i] = s; |
|
|
|
|
} |
|
|
|
|
if (snapshot_created) { |
|
|
|
@ -1163,9 +1171,8 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { |
|
|
|
|
return true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, |
|
|
|
|
const std::string& index_entry, std::string* value, |
|
|
|
|
SequenceNumber* sequence) { |
|
|
|
|
Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, |
|
|
|
|
std::string* value, SequenceNumber* sequence) { |
|
|
|
|
Slice index_entry_slice(index_entry); |
|
|
|
|
BlobHandle handle; |
|
|
|
|
Status s = handle.DecodeFrom(&index_entry_slice); |
|
|
|
@ -1269,10 +1276,12 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, |
|
|
|
|
|
|
|
|
|
if (bdb_options_.compression != kNoCompression) { |
|
|
|
|
BlockContents contents; |
|
|
|
|
auto cfh = |
|
|
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); |
|
|
|
|
s = UncompressBlockContentsForCompressionType( |
|
|
|
|
blob_value.data(), blob_value.size(), &contents, |
|
|
|
|
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, |
|
|
|
|
*(cfd->ioptions())); |
|
|
|
|
*(cfh->cfd()->ioptions())); |
|
|
|
|
*value = contents.data.ToString(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -1299,9 +1308,10 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, |
|
|
|
|
Status BlobDBImpl::Get(const ReadOptions& read_options, |
|
|
|
|
ColumnFamilyHandle* column_family, const Slice& key, |
|
|
|
|
PinnableSlice* value) { |
|
|
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); |
|
|
|
|
auto cfd = cfh->cfd(); |
|
|
|
|
|
|
|
|
|
if (column_family != DefaultColumnFamily()) { |
|
|
|
|
return Status::NotSupported( |
|
|
|
|
"Blob DB doesn't support non-default column family."); |
|
|
|
|
} |
|
|
|
|
// Get a snapshot to avoid blob file get deleted between we
|
|
|
|
|
// fetch and index entry and reading from the file.
|
|
|
|
|
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
|
|
|
|
@ -1310,11 +1320,11 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, |
|
|
|
|
|
|
|
|
|
Status s; |
|
|
|
|
std::string index_entry; |
|
|
|
|
s = db_->Get(ro, column_family, key, &index_entry); |
|
|
|
|
s = db_->Get(ro, key, &index_entry); |
|
|
|
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); |
|
|
|
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); |
|
|
|
|
if (s.ok()) { |
|
|
|
|
s = CommonGet(cfd, key, index_entry, value->GetSelf()); |
|
|
|
|
s = CommonGet(key, index_entry, value->GetSelf()); |
|
|
|
|
value->PinSelf(); |
|
|
|
|
} |
|
|
|
|
if (snapshot_created) { |
|
|
|
@ -1324,15 +1334,11 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Slice BlobDBIterator::value() const { |
|
|
|
|
Slice index_entry = iter_->value(); |
|
|
|
|
|
|
|
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_); |
|
|
|
|
auto cfd = cfh->cfd(); |
|
|
|
|
|
|
|
|
|
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1"); |
|
|
|
|
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2"); |
|
|
|
|
Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false), |
|
|
|
|
&vpart_); |
|
|
|
|
Slice index_entry = iter_->value(); |
|
|
|
|
Status s = |
|
|
|
|
db_impl_->CommonGet(iter_->key(), index_entry.ToString(false), &vpart_); |
|
|
|
|
return Slice(vpart_); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -2248,14 +2254,13 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) { |
|
|
|
|
return std::make_pair(true, -1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options, |
|
|
|
|
ColumnFamilyHandle* column_family) { |
|
|
|
|
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { |
|
|
|
|
// Get a snapshot to avoid blob file get deleted between we
|
|
|
|
|
// fetch and index entry and reading from the file.
|
|
|
|
|
ReadOptions ro(read_options); |
|
|
|
|
bool snapshot_created = SetSnapshotIfNeeded(&ro); |
|
|
|
|
return new BlobDBIterator(db_->NewIterator(ro, column_family), column_family, |
|
|
|
|
this, snapshot_created, ro.snapshot); |
|
|
|
|
return new BlobDBIterator(db_->NewIterator(ro), this, snapshot_created, |
|
|
|
|
ro.snapshot); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
Status DestroyBlobDB(const std::string& dbname, const Options& options, |
|
|
|
@ -2299,8 +2304,7 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key, |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
return s; |
|
|
|
|
} |
|
|
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); |
|
|
|
|
return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence); |
|
|
|
|
return CommonGet(key, index_entry, nullptr, sequence); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const { |
|
|
|
|