Use SST file manager to track blob files as well (#8037)

Summary:
Extend support to track blob files in SST File manager.
 This PR notifies SstFileManager whenever a new blob file is created,
 via OnAddFile and  an obsolete blob file deleted via OnDeleteFile
 and delete file via ScheduleFileDeletion.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8037

Test Plan: Add new unit tests

Reviewed By: ltamasi

Differential Revision: D26891237

Pulled By: akankshamahajan15

fbshipit-source-id: 04c69ccfda2a73782fd5c51982dae58dd11979b6
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent c603f2f898
commit 27d57a035e
  1. 1
      HISTORY.md
  2. 28
      db/blob/blob_file_builder.cc
  3. 5
      db/blob/blob_file_builder.h
  4. 35
      db/blob/blob_file_builder_test.cc
  5. 53
      db/blob/blob_file_completion_callback.h
  6. 26
      db/blob/db_blob_compaction_test.cc
  7. 20
      db/builder.cc
  8. 4
      db/builder.h
  9. 22
      db/compaction/compaction_job.cc
  10. 5
      db/compaction/compaction_job.h
  11. 25
      db/db_compaction_test.cc
  12. 19
      db/db_impl/db_impl.cc
  13. 2
      db/db_impl/db_impl.h
  14. 3
      db/db_impl/db_impl_compaction_flush.cc
  15. 10
      db/db_impl/db_impl_open.cc
  16. 320
      db/db_sst_test.cc
  17. 40
      db/db_test_util.cc
  18. 7
      db/db_test_util.h
  19. 39
      db/flush_job.cc
  20. 5
      db/flush_job.h
  21. 8
      db/repair_test.cc
  22. 2
      file/delete_scheduler_test.cc
  23. 52
      file/sst_file_manager_impl.cc
  24. 35
      file/sst_file_manager_impl.h
  25. 19
      include/rocksdb/sst_file_manager.h

@ -13,6 +13,7 @@
* Clarified the required semantics of Read() functions in FileSystem and Env APIs. Please ensure any custom implementations are compliant.
* For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files.
* Add EqualWithoutTimestamp() to Comparator.
* Extend support to track blob files in SSTFileManager whenever a blob file is created/deleted. Blob files will be scheduled to delete via SSTFileManager and SStFileManager will now take blob files in account while calculating size and space limits along with SST files.
### New Features
* Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision.

@ -8,6 +8,7 @@
#include <cassert>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_completion_callback.h"
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
@ -34,12 +35,13 @@ BlobFileBuilder::BlobFileBuilder(
const std::string& column_family_name, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
immutable_cf_options, mutable_cf_options, file_options,
job_id, column_family_id, column_family_name, io_priority,
write_hint, io_tracer, blob_file_paths,
write_hint, io_tracer, blob_callback, blob_file_paths,
blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
@ -50,6 +52,7 @@ BlobFileBuilder::BlobFileBuilder(
const std::string& column_family_name, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: file_number_generator_(std::move(file_number_generator)),
@ -65,6 +68,7 @@ BlobFileBuilder::BlobFileBuilder(
io_priority_(io_priority),
write_hint_(write_hint),
io_tracer_(io_tracer),
blob_callback_(blob_callback),
blob_file_paths_(blob_file_paths),
blob_file_additions_(blob_file_additions),
blob_count_(0),
@ -303,11 +307,15 @@ Status BlobFileBuilder::CloseBlobFile() {
column_family_name_.c_str(), job_id_, blob_file_number,
blob_count_, blob_bytes_);
if (blob_callback_) {
s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back());
}
writer_.reset();
blob_count_ = 0;
blob_bytes_ = 0;
return Status::OK();
return s;
}
Status BlobFileBuilder::CloseBlobFileIfNeeded() {
@ -323,4 +331,20 @@ Status BlobFileBuilder::CloseBlobFileIfNeeded() {
return CloseBlobFile();
}
void BlobFileBuilder::Abandon() {
if (!IsBlobFileOpen()) {
return;
}
if (blob_callback_) {
// BlobFileBuilder::Abandon() is called because of error while writing to
// Blob files. So we can ignore the below error.
blob_callback_->OnBlobFileCompleted(blob_file_paths_->back())
.PermitUncheckedError();
}
writer_.reset();
blob_count_ = 0;
blob_bytes_ = 0;
}
} // namespace ROCKSDB_NAMESPACE

@ -27,6 +27,7 @@ class Status;
class Slice;
class BlobLogWriter;
class IOTracer;
class BlobFileCompletionCallback;
class BlobFileBuilder {
public:
@ -39,6 +40,7 @@ class BlobFileBuilder {
Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
@ -52,6 +54,7 @@ class BlobFileBuilder {
Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCompletionCallback* blob_callback,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
@ -62,6 +65,7 @@ class BlobFileBuilder {
Status Add(const Slice& key, const Slice& value, std::string* blob_index);
Status Finish();
void Abandon();
private:
bool IsBlobFileOpen() const;
@ -85,6 +89,7 @@ class BlobFileBuilder {
Env::IOPriority io_priority_;
Env::WriteLifeTimeHint write_hint_;
std::shared_ptr<IOTracer> io_tracer_;
BlobFileCompletionCallback* blob_callback_;
std::vector<std::string>* blob_file_paths_;
std::vector<BlobFileAddition>* blob_file_additions_;
std::unique_ptr<BlobLogWriter> writer_;

@ -144,8 +144,9 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@ -228,8 +229,9 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@ -314,8 +316,9 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
for (size_t i = 0; i < number_of_blobs; ++i) {
const std::string key = std::to_string(i);
@ -367,8 +370,9 @@ TEST_F(BlobFileBuilderTest, Compression) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
const std::string key("1");
const std::string uncompressed_value(value_size, 'x');
@ -449,8 +453,9 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
[](void* arg) {
@ -527,8 +532,9 @@ TEST_F(BlobFileBuilderTest, Checksum) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
const std::string key("1");
const std::string value("deadbeef");
@ -623,8 +629,9 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
write_hint, nullptr /*IOTracer*/,
nullptr /*BlobFileCompletionCallback*/,
&blob_file_paths, &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);

@ -0,0 +1,53 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/error_handler.h"
#include "file/sst_file_manager_impl.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
class BlobFileCompletionCallback {
public:
BlobFileCompletionCallback(SstFileManager* sst_file_manager,
InstrumentedMutex* mutex,
ErrorHandler* error_handler)
: sst_file_manager_(sst_file_manager),
mutex_(mutex),
error_handler_(error_handler) {}
Status OnBlobFileCompleted(const std::string& file_name) {
Status s;
#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager_);
if (sfm) {
// Report new blob files to SstFileManagerImpl
s = sfm->OnAddFile(file_name);
if (sfm->IsMaxAllowedSpaceReached()) {
s = Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT(
"BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached");
InstrumentedMutexLock l(mutex_);
error_handler_->SetBGError(s, BackgroundErrorReason::kFlush);
}
}
#else
(void)file_name;
#endif // ROCKSDB_LITE
return s;
}
private:
SstFileManager* sst_file_manager_;
InstrumentedMutex* mutex_;
ErrorHandler* error_handler_;
};
} // namespace ROCKSDB_NAMESPACE

@ -16,32 +16,6 @@ class DBBlobCompactionTest : public DBTestBase {
explicit DBBlobCompactionTest()
: DBTestBase("/db_blob_compaction_test", /*env_do_fsync=*/false) {}
// TODO: copied from DBCompactionTest. Should be de-duplicated in the future.
std::vector<uint64_t> GetBlobFileNumbers() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
std::vector<uint64_t> result;
result.reserve(blob_files.size());
for (const auto& blob_file : blob_files) {
result.emplace_back(blob_file.first);
}
return result;
}
#ifndef ROCKSDB_LITE
const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();

@ -23,6 +23,7 @@
#include "db/range_del_aggregator.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
@ -92,7 +93,8 @@ Status BuildTable(
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
const uint64_t file_creation_time, const std::string& db_id,
const std::string& db_session_id, const std::string* full_history_ts_low) {
const std::string& db_session_id, const std::string* full_history_ts_low,
BlobFileCompletionCallback* blob_callback) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
@ -175,10 +177,11 @@ Status BuildTable(
std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
? new BlobFileBuilder(
versions, fs, &ioptions, &mutable_cf_options, &file_options,
job_id, column_family_id, column_family_name, io_priority,
write_hint, io_tracer, &blob_file_paths, blob_file_additions)
? new BlobFileBuilder(versions, fs, &ioptions, &mutable_cf_options,
&file_options, job_id, column_family_id,
column_family_name, io_priority, write_hint,
io_tracer, blob_callback, &blob_file_paths,
blob_file_additions)
: nullptr);
CompactionIterator c_iter(
@ -278,8 +281,9 @@ Status BuildTable(
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
} else {
blob_file_builder->Abandon();
}
blob_file_builder.reset();
}
@ -339,8 +343,10 @@ Status BuildTable(
if (blob_file_additions) {
for (const std::string& blob_file_path : blob_file_paths) {
ignored = fs->DeleteFile(blob_file_path, IOOptions(), dbg);
ignored = DeleteDBFile(&db_options, blob_file_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
ignored.PermitUncheckedError();
TEST_SYNC_POINT("BuildTable::AfterDeleteFile");
}
}
}

@ -38,6 +38,7 @@ class VersionEdit;
class TableBuilder;
class WritableFileWriter;
class InternalStats;
class BlobFileCompletionCallback;
// @param column_family_name Name of the column family that is also identified
// by column_family_id, or empty string if unknown. It must outlive the
@ -90,6 +91,7 @@ extern Status BuildTable(
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
const uint64_t file_creation_time = 0, const std::string& db_id = "",
const std::string& db_session_id = "",
const std::string* full_history_ts_low = nullptr);
const std::string* full_history_ts_low = nullptr,
BlobFileCompletionCallback* blob_callback = nullptr);
} // namespace ROCKSDB_NAMESPACE

@ -310,7 +310,8 @@ CompactionJob::CompactionJob(
const std::string& dbname, CompactionJobStats* compaction_job_stats,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
const std::string& db_session_id, std::string full_history_ts_low)
const std::string& db_session_id, std::string full_history_ts_low,
BlobFileCompletionCallback* blob_callback)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
@ -346,7 +347,8 @@ CompactionJob::CompactionJob(
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri),
full_history_ts_low_(std::move(full_history_ts_low)) {
full_history_ts_low_(std::move(full_history_ts_low)),
blob_callback_(blob_callback) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
@ -978,12 +980,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
? new BlobFileBuilder(
versions_, fs_.get(),
sub_compact->compaction->immutable_cf_options(),
mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
io_tracer_, &blob_file_paths, &sub_compact->blob_file_additions)
? new BlobFileBuilder(versions_, fs_.get(),
sub_compact->compaction->immutable_cf_options(),
mutable_cf_options, &file_options_, job_id_,
cfd->GetID(), cfd->GetName(),
Env::IOPriority::IO_LOW, write_hint_,
io_tracer_, blob_callback_, &blob_file_paths,
&sub_compact->blob_file_additions)
: nullptr);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
@ -1189,8 +1192,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (blob_file_builder) {
if (status.ok()) {
status = blob_file_builder->Finish();
} else {
blob_file_builder->Abandon();
}
blob_file_builder.reset();
}

@ -17,6 +17,7 @@
#include <utility>
#include <vector>
#include "db/blob/blob_file_completion_callback.h"
#include "db/column_family.h"
#include "db/compaction/compaction_iterator.h"
#include "db/dbformat.h"
@ -80,7 +81,8 @@ class CompactionJob {
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::atomic<int>* manual_compaction_paused = nullptr,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "");
std::string full_history_ts_low = "",
BlobFileCompletionCallback* blob_callback = nullptr);
~CompactionJob();
@ -204,6 +206,7 @@ class CompactionJob {
Env::Priority thread_pri_;
IOStatus io_status_;
std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
};
} // namespace ROCKSDB_NAMESPACE

@ -33,31 +33,6 @@ class DBCompactionTest : public DBTestBase {
public:
DBCompactionTest()
: DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {}
std::vector<uint64_t> GetBlobFileNumbers() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
std::vector<uint64_t> result;
result.reserve(blob_files.size());
for (const auto& blob_file : blob_files) {
result.emplace_back(blob_file.first);
}
return result;
}
};
class DBCompactionTestWithParam

@ -231,7 +231,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_install_cv_(&mutex_) {
atomic_flush_install_cv_(&mutex_),
blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
&error_handler_) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
@ -511,6 +513,11 @@ Status DBImpl::CloseHelper() {
}
mutex_.Unlock();
// Below check is added as recovery_error_ is not checked and it causes crash
// in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
// reached.
error_handler_.GetRecoveryError().PermitUncheckedError();
// CancelAllBackgroundWork called with false means we just set the shutdown
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
@ -3943,7 +3950,8 @@ Status DestroyDB(const std::string& dbname, const Options& options,
std::string path_to_delete = dbname + "/" + fname;
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile || type == kWalFile) {
} else if (type == kTableFile || type == kWalFile ||
type == kBlobFile) {
del = DeleteDBFile(&soptions, path_to_delete, dbname,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
} else {
@ -3968,9 +3976,10 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (env->GetChildren(path, &filenames).ok()) {
for (const auto& fname : filenames) {
if (ParseFileName(fname, &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
std::string table_path = path + "/" + fname;
Status del = DeleteDBFile(&soptions, table_path, dbname,
(type == kTableFile ||
type == kBlobFile)) { // Lock file will be deleted at end
std::string file_path = path + "/" + fname;
Status del = DeleteDBFile(&soptions, file_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
if (!del.ok() && result.ok()) {
result = del;

@ -2219,6 +2219,8 @@ class DBImpl : public DB {
InstrumentedCondVar atomic_flush_install_cv_;
bool wal_in_db_path_;
BlobFileCompletionCallback blob_callback_;
};
extern Options SanitizeOptions(const std::string& db, const Options& src);

@ -166,7 +166,8 @@ Status DBImpl::FlushMemTableToOutputFile(
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri,
io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow());
io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(),
&blob_callback_);
FileMetaData file_meta;
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");

@ -1364,7 +1364,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
io_tracer_, &event_logger_, job_id, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
db_id_, db_session_id_);
db_id_, db_session_id_, nullptr /*full_history_ts_low*/,
&blob_callback_);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
@ -1723,6 +1724,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
std::vector<LiveFileMetaData> metadata;
// TODO: Once GetLiveFilesMetaData supports blob files, update the logic
// below to get known_file_sizes for blob files.
impl->mutex_.Lock();
impl->versions_->GetLiveFilesMetaData(&metadata);
impl->mutex_.Unlock();
@ -1755,13 +1758,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
FileType file_type;
std::string file_path = path + "/" + file_name;
if (ParseFileName(file_name, &file_number, &file_type) &&
file_type == kTableFile) {
(file_type == kTableFile || file_type == kBlobFile)) {
// TODO: Check for errors from OnAddFile?
if (known_file_sizes.count(file_name)) {
// We're assuming that each sst file name exists in at most one of
// the paths.
sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
/* compaction */ false)
sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
.PermitUncheckedError();
} else {
sfm->OnAddFile(file_path).PermitUncheckedError();

@ -306,13 +306,13 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Verify that we are tracking all sst files in dbname_
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllSSTFiles(&files_in_db));
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllSSTFiles(&files_in_db));
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
// Verify that we are tracking all sst files in dbname_
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
// Verify the total files size
@ -346,6 +346,260 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.blob_file_size = 32; // create one blob per file
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("Key_" + std::to_string(i), "Value_" + std::to_string(i)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Verify that we are tracking all sst and blob files in dbname_
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
std::vector<uint64_t> blob_files = GetBlobFileNumbers();
ASSERT_EQ(files_added, blob_files.size());
// No blob file is obsoleted.
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
// No files were moved.
ASSERT_EQ(files_moved, 0);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
// Verify that we are tracking all sst and blob files in dbname_
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
// Verify the total files size
uint64_t total_files_size = 0;
for (auto& file_to_size : files_in_db) {
total_files_size += file_to_size.second;
}
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
Close();
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
// Verify that we track all the files again after the DB is closed and opened.
Close();
sst_file_manager.reset(NewSstFileManager(env_));
options.sst_file_manager = sst_file_manager;
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
// Destroy DB and it will remove all the blob files from sst file manager and
// blob files deletion will go through ScheduleFileDeletion.
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
Close();
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_EQ(files_deleted, blob_files.size());
ASSERT_EQ(files_scheduled_to_delete, blob_files.size());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.enable_blob_files = true;
options.blob_file_size = 32; // create one blob per file
options.disable_auto_compactions = true;
options.enable_blob_garbage_collection = true;
options.blob_garbage_collection_age_cutoff = 0.5;
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
int files_scheduled_to_delete = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_added++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
files_deleted++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (file_path->find(".blob") != std::string::npos) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
Random rnd(301);
constexpr char first_key[] = "first_key";
constexpr char first_value[] = "first_value";
constexpr char second_key[] = "second_key";
constexpr char second_value[] = "second_value";
ASSERT_OK(Put(first_key, first_value));
ASSERT_OK(Put(second_key, second_value));
ASSERT_OK(Flush());
constexpr char third_key[] = "third_key";
constexpr char third_value[] = "third_value";
constexpr char fourth_key[] = "fourth_key";
constexpr char fourth_value[] = "fourth_value";
constexpr char fifth_key[] = "fifth_key";
constexpr char fifth_value[] = "fifth_value";
ASSERT_OK(Put(third_key, third_value));
ASSERT_OK(Put(fourth_key, fourth_value));
ASSERT_OK(Put(fifth_key, fifth_value));
ASSERT_OK(Flush());
const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
ASSERT_EQ(original_blob_files.size(), 5);
ASSERT_EQ(files_added, 5);
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
ASSERT_EQ(files_moved, 0);
{
// Verify that we are tracking all sst and blob files in dbname_
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
const size_t cutoff_index = static_cast<size_t>(
options.blob_garbage_collection_age_cutoff * original_blob_files.size());
size_t expected_number_of_files = original_blob_files.size();
// Note: turning off enable_blob_files before the compaction results in
// garbage collected values getting inlined.
ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
expected_number_of_files -= cutoff_index;
files_added = 0;
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
ASSERT_EQ(Get(first_key), first_value);
ASSERT_EQ(Get(second_key), second_value);
ASSERT_EQ(Get(third_key), third_value);
ASSERT_EQ(Get(fourth_key), fourth_value);
ASSERT_EQ(Get(fifth_key), fifth_value);
const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
// No new file is added.
ASSERT_EQ(files_added, 0);
ASSERT_EQ(files_deleted, cutoff_index);
ASSERT_EQ(files_scheduled_to_delete, cutoff_index);
ASSERT_EQ(files_moved, 0);
// Original blob files below the cutoff should be gone, original blob files at
// or above the cutoff should be still there
for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
}
{
// Verify that we are tracking all sst and blob files in dbname_
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db));
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db));
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
}
Close();
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_EQ(files_deleted, 5);
ASSERT_EQ(files_scheduled_to_delete, 5);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(DBSSTTest, RateLimitedDelete) {
Destroy(last_options_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
@ -780,7 +1034,7 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
uint64_t first_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &first_file_size));
ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
// Set the maximum allowed space usage to the current total size
@ -791,6 +1045,60 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
ASSERT_NOK(Flush());
}
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedWithBlobFiles) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
options.disable_auto_compactions = true;
options.enable_blob_files = true;
DestroyAndReopen(options);
Random rnd(301);
// Generate a file containing keys.
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
}
ASSERT_OK(Flush());
uint64_t files_size = 0;
uint64_t total_files_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db, &files_size));
// Make sure blob files are considered by SSTFileManage in size limits.
ASSERT_GT(files_size, 0);
total_files_size = files_size;
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &files_size));
total_files_size += files_size;
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
// Set the maximum allowed space usage to the current total size.
sfm->SetMaxAllowedSpaceUsage(files_size + 1);
bool max_allowed_space_reached = false;
bool delete_blob_file = false;
// Sync point called after blob file is closed and max allowed space is
// checked.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached",
[&](void* /*arg*/) { max_allowed_space_reached = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BuildTable::AfterDeleteFile",
[&](void* /*arg*/) { delete_blob_file = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key1", "val1"));
// This flush will fail
ASSERT_NOK(Flush());
ASSERT_TRUE(max_allowed_space_reached);
ASSERT_TRUE(delete_blob_file);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBSSTTest, CancellingCompactionsWorks) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
@ -821,7 +1129,7 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) {
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
@ -869,7 +1177,7 @@ TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
ASSERT_OK(Flush());
uint64_t total_file_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size));
// Set the maximum allowed space usage to the current total size
sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
@ -986,7 +1294,7 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
ASSERT_TRUE(bg_error_set);
uint64_t total_sst_files_size = 0;
std::unordered_map<std::string, uint64_t> files_in_db;
ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_sst_files_size));
ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

@ -1127,8 +1127,34 @@ std::string DBTestBase::FilesPerLevel(int cf) {
result.resize(last_non_zero_offset);
return result;
}
#endif // !ROCKSDB_LITE
std::vector<uint64_t> DBTestBase::GetBlobFileNumbers() {
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
Version* const current = cfd->current();
assert(current);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
const auto& blob_files = storage_info->GetBlobFiles();
std::vector<uint64_t> result;
result.reserve(blob_files.size());
for (const auto& blob_file : blob_files) {
result.emplace_back(blob_file.first);
}
return result;
}
size_t DBTestBase::CountFiles() {
size_t count = 0;
std::vector<std::string> files;
@ -1437,26 +1463,26 @@ void DBTestBase::CopyFile(const std::string& source,
ASSERT_OK(destfile->Close());
}
Status DBTestBase::GetAllSSTFiles(
std::unordered_map<std::string, uint64_t>* sst_files,
Status DBTestBase::GetAllDataFiles(
const FileType file_type, std::unordered_map<std::string, uint64_t>* files,
uint64_t* total_size /* = nullptr */) {
if (total_size) {
*total_size = 0;
}
std::vector<std::string> files;
Status s = env_->GetChildren(dbname_, &files);
std::vector<std::string> children;
Status s = env_->GetChildren(dbname_, &children);
if (s.ok()) {
for (auto& file_name : files) {
for (auto& file_name : children) {
uint64_t number;
FileType type;
if (ParseFileName(file_name, &number, &type) && type == kTableFile) {
if (ParseFileName(file_name, &number, &type) && type == file_type) {
std::string file_path = dbname_ + "/" + file_name;
uint64_t file_size = 0;
s = env_->GetFileSize(file_path, &file_size);
if (!s.ok()) {
break;
}
(*sst_files)[file_path] = file_size;
(*files)[file_path] = file_size;
if (total_size) {
*total_size += file_size;
}

@ -1064,6 +1064,8 @@ class DBTestBase : public testing::Test {
int TotalTableFiles(int cf = 0, int levels = -1);
#endif // ROCKSDB_LITE
std::vector<uint64_t> GetBlobFileNumbers();
// Return spread of files per level
std::string FilesPerLevel(int cf = 0);
@ -1153,8 +1155,9 @@ class DBTestBase : public testing::Test {
void CopyFile(const std::string& source, const std::string& destination,
uint64_t size = 0);
Status GetAllSSTFiles(std::unordered_map<std::string, uint64_t>* sst_files,
uint64_t* total_size = nullptr);
Status GetAllDataFiles(const FileType file_type,
std::unordered_map<std::string, uint64_t>* sst_files,
uint64_t* total_size = nullptr);
std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path);

@ -80,24 +80,22 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
}
}
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
uint64_t max_memtable_id, const FileOptions& file_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low)
FlushJob::FlushJob(
const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
const FileOptions& file_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, FSDirectory* db_directory,
FSDirectory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id, const std::string& db_session_id,
std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
: dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
@ -128,7 +126,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
thread_pri_(thread_pri),
io_tracer_(io_tracer),
clock_(db_options_.clock),
full_history_ts_low_(std::move(full_history_ts_low)) {
full_history_ts_low_(std::move(full_history_ts_low)),
blob_callback_(blob_callback) {
// Update the thread status to indicate flush.
ReportStartedFlush();
TEST_SYNC_POINT("FlushJob::FlushJob()");
@ -418,7 +417,7 @@ Status FlushJob::WriteLevel0Table() {
TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
creation_time, oldest_key_time, write_hint, current_time, db_id_,
db_session_id_, full_history_ts_low);
db_session_id_, full_history_ts_low, blob_callback_);
if (!io_s.ok()) {
io_status_ = io_s;
}

@ -17,6 +17,7 @@
#include <utility>
#include <vector>
#include "db/blob/blob_file_completion_callback.h"
#include "db/column_family.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
@ -73,7 +74,8 @@ class FlushJob {
const bool sync_output_directory, const bool write_manifest,
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_id = "", const std::string& db_session_id = "",
std::string full_history_ts_low = "");
std::string full_history_ts_low = "",
BlobFileCompletionCallback* blob_callback = nullptr);
~FlushJob();
@ -165,6 +167,7 @@ class FlushJob {
SystemClock* clock_;
const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
};
} // namespace ROCKSDB_NAMESPACE

@ -184,7 +184,7 @@ TEST_F(RepairTest, UnflushedSst) {
{
uint64_t total_ssts_size;
std::unordered_map<std::string, uint64_t> sst_files;
ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size));
ASSERT_EQ(total_ssts_size, 0);
}
// Need to get path before Close() deletes db_, but delete it after Close() to
@ -203,7 +203,7 @@ TEST_F(RepairTest, UnflushedSst) {
{
uint64_t total_ssts_size;
std::unordered_map<std::string, uint64_t> sst_files;
ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size));
ASSERT_GT(total_ssts_size, 0);
}
ASSERT_EQ(Get("key"), "val");
@ -221,7 +221,7 @@ TEST_F(RepairTest, SeparateWalDir) {
{
uint64_t total_ssts_size;
std::unordered_map<std::string, uint64_t> sst_files;
ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size));
ASSERT_EQ(total_ssts_size, 0);
}
std::string manifest_path =
@ -241,7 +241,7 @@ TEST_F(RepairTest, SeparateWalDir) {
{
uint64_t total_ssts_size;
std::unordered_map<std::string, uint64_t> sst_files;
ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size));
ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size));
ASSERT_GT(total_ssts_size, 0);
}
ASSERT_EQ(Get("key"), "val");

@ -87,7 +87,7 @@ class DeleteSchedulerTest : public testing::Test {
std::string data(size, 'A');
EXPECT_OK(f->Append(data));
EXPECT_OK(f->Close());
sst_file_mgr_->OnAddFile(file_path, false);
sst_file_mgr_->OnAddFile(file_path);
return file_path;
}

@ -27,7 +27,6 @@ SstFileManagerImpl::SstFileManagerImpl(
fs_(fs),
logger_(logger),
total_files_size_(0),
in_progress_files_size_(0),
compaction_buffer_size_(0),
cur_compactions_reserved_size_(0),
max_allowed_space_(0),
@ -60,23 +59,24 @@ void SstFileManagerImpl::Close() {
}
}
Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
bool compaction) {
Status SstFileManagerImpl::OnAddFile(const std::string& file_path) {
uint64_t file_size;
Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
if (s.ok()) {
MutexLock l(&mu_);
OnAddFileImpl(file_path, file_size, compaction);
OnAddFileImpl(file_path, file_size);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnAddFile",
const_cast<std::string*>(&file_path));
return s;
}
Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
uint64_t file_size, bool compaction) {
uint64_t file_size) {
MutexLock l(&mu_);
OnAddFileImpl(file_path, file_size, compaction);
TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
OnAddFileImpl(file_path, file_size);
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnAddFile",
const_cast<std::string*>(&file_path));
return Status::OK();
}
@ -85,7 +85,8 @@ Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
MutexLock l(&mu_);
OnDeleteFileImpl(file_path);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnDeleteFile",
const_cast<std::string*>(&file_path));
return Status::OK();
}
@ -99,19 +100,6 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
}
}
cur_compactions_reserved_size_ -= size_added_by_compaction;
auto new_files = c->edit()->GetNewFiles();
for (auto& new_file : new_files) {
auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
new_file.second.fd.GetNumber(),
new_file.second.fd.GetPathId());
if (in_progress_files_.find(fn) != in_progress_files_.end()) {
auto tracked_file = tracked_files_.find(fn);
assert(tracked_file != tracked_files_.end());
in_progress_files_size_ -= tracked_file->second;
in_progress_files_.erase(fn);
}
}
}
Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
@ -122,7 +110,7 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
if (file_size != nullptr) {
*file_size = tracked_files_[old_path];
}
OnAddFileImpl(new_path, tracked_files_[old_path], false);
OnAddFileImpl(new_path, tracked_files_[old_path]);
OnDeleteFileImpl(old_path);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
@ -199,7 +187,6 @@ bool SstFileManagerImpl::EnoughRoomForCompaction(
if (compaction_buffer_size_ == 0) {
needed_headroom += reserved_disk_buffer_;
}
needed_headroom -= in_progress_files_size_;
if (free_space < needed_headroom + size_added_by_compaction) {
// We hit the condition of not enough disk space
ROCKS_LOG_ERROR(logger_,
@ -440,24 +427,15 @@ void SstFileManagerImpl::WaitForEmptyTrash() {
}
void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
uint64_t file_size, bool compaction) {
uint64_t file_size) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file != tracked_files_.end()) {
// File was added before, we will just update the size
assert(!compaction);
total_files_size_ -= tracked_file->second;
total_files_size_ += file_size;
cur_compactions_reserved_size_ -= file_size;
} else {
total_files_size_ += file_size;
if (compaction) {
// Keep track of the size of files created by in-progress compactions.
// When calculating whether there's enough headroom for new compactions,
// this will be subtracted from cur_compactions_reserved_size_.
// Otherwise, compactions will be double counted.
in_progress_files_size_ += file_size;
in_progress_files_.insert(file_path);
}
}
tracked_files_[file_path] = file_size;
}
@ -466,16 +444,10 @@ void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file == tracked_files_.end()) {
// File is not tracked
assert(in_progress_files_.find(file_path) == in_progress_files_.end());
return;
}
total_files_size_ -= tracked_file->second;
// Check if it belonged to an in-progress compaction
if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
in_progress_files_size_ -= tracked_file->second;
in_progress_files_.erase(file_path);
}
tracked_files_.erase(tracked_file);
}

@ -21,9 +21,8 @@ class FileSystem;
class SystemClock;
class Logger;
// SstFileManager is used to track SST files in the DB and control their
// deletion rate.
// All SstFileManager public functions are thread-safe.
// SstFileManager is used to track SST and blob files in the DB and control
// their deletion rate. All SstFileManager public functions are thread-safe.
class SstFileManagerImpl : public SstFileManager {
public:
explicit SstFileManagerImpl(const std::shared_ptr<SystemClock>& clock,
@ -35,24 +34,23 @@ class SstFileManagerImpl : public SstFileManager {
~SstFileManagerImpl();
// DB will call OnAddFile whenever a new sst file is added.
Status OnAddFile(const std::string& file_path, bool compaction = false);
// DB will call OnAddFile whenever a new sst/blob file is added.
Status OnAddFile(const std::string& file_path);
// Overload where size of the file is provided by the caller rather than
// queried from the filesystem. This is an optimization.
Status OnAddFile(const std::string& file_path, uint64_t file_size,
bool compaction);
Status OnAddFile(const std::string& file_path, uint64_t file_size);
// DB will call OnDeleteFile whenever an sst file is deleted.
// DB will call OnDeleteFile whenever a sst/blob file is deleted.
Status OnDeleteFile(const std::string& file_path);
// DB will call OnMoveFile whenever an sst file is move to a new path.
// DB will call OnMoveFile whenever a sst/blob file is move to a new path.
Status OnMoveFile(const std::string& old_path, const std::string& new_path,
uint64_t* file_size = nullptr);
// Update the maximum allowed space that should be used by RocksDB, if
// the total size of the SST files exceeds max_allowed_space, writes to
// RocksDB will fail.
// the total size of the SST and blob files exceeds max_allowed_space, writes
// to RocksDB will fail.
//
// Setting max_allowed_space to 0 will disable this feature, maximum allowed
// space will be infinite (Default value).
@ -62,8 +60,8 @@ class SstFileManagerImpl : public SstFileManager {
void SetCompactionBufferSize(uint64_t compaction_buffer_size) override;
// Return true if the total size of SST files exceeded the maximum allowed
// space usage.
// Return true if the total size of SST and blob files exceeded the maximum
// allowed space usage.
//
// thread-safe.
bool IsMaxAllowedSpaceReached() override;
@ -142,8 +140,7 @@ class SstFileManagerImpl : public SstFileManager {
private:
// REQUIRES: mutex locked
void OnAddFileImpl(const std::string& file_path, uint64_t file_size,
bool compaction);
void OnAddFileImpl(const std::string& file_path, uint64_t file_size);
// REQUIRES: mutex locked
void OnDeleteFileImpl(const std::string& file_path);
@ -159,8 +156,6 @@ class SstFileManagerImpl : public SstFileManager {
port::Mutex mu_;
// The summation of the sizes of all files in tracked_files_ map
uint64_t total_files_size_;
// The summation of all output files of in-progress compactions
uint64_t in_progress_files_size_;
// Compactions should only execute if they can leave at least
// this amount of buffer space for logs and flushes
uint64_t compaction_buffer_size_;
@ -169,9 +164,7 @@ class SstFileManagerImpl : public SstFileManager {
// A map containing all tracked files and there sizes
// file_path => file_size
std::unordered_map<std::string, uint64_t> tracked_files_;
// A set of files belonging to in-progress compactions
std::unordered_set<std::string> in_progress_files_;
// The maximum allowed space (in bytes) for sst files.
// The maximum allowed space (in bytes) for sst and blob files.
uint64_t max_allowed_space_;
// DeleteScheduler used to throttle file deletition.
DeleteScheduler delete_scheduler_;
@ -191,7 +184,7 @@ class SstFileManagerImpl : public SstFileManager {
// compactions to run full throttle. If disk space is below this trigger,
// compactions will be gated by free disk space > input size
uint64_t free_space_trigger_;
// List of database error handler instances tracked by this sst file manager
// List of database error handler instances tracked by this SstFileManager.
std::list<ErrorHandler*> error_handler_list_;
// Pointer to ErrorHandler instance that is currently processing recovery
ErrorHandler* cur_instance_;

@ -19,17 +19,16 @@ namespace ROCKSDB_NAMESPACE {
class Env;
class Logger;
// SstFileManager is used to track SST files in the DB and control their
// deletion rate.
// All SstFileManager public functions are thread-safe.
// SstFileManager is used to track SST and blob files in the DB and control
// their deletion rate. All SstFileManager public functions are thread-safe.
// SstFileManager is not extensible.
class SstFileManager {
public:
virtual ~SstFileManager() {}
// Update the maximum allowed space that should be used by RocksDB, if
// the total size of the SST files exceeds max_allowed_space, writes to
// RocksDB will fail.
// the total size of the SST and blob files exceeds max_allowed_space, writes
// to RocksDB will fail.
//
// Setting max_allowed_space to 0 will disable this feature; maximum allowed
// space will be infinite (Default value).
@ -43,14 +42,14 @@ class SstFileManager {
// other background functions may continue, such as logging and flushing.
virtual void SetCompactionBufferSize(uint64_t compaction_buffer_size) = 0;
// Return true if the total size of SST files exceeded the maximum allowed
// space usage.
// Return true if the total size of SST and blob files exceeded the maximum
// allowed space usage.
//
// thread-safe.
virtual bool IsMaxAllowedSpaceReached() = 0;
// Returns true if the total size of SST files as well as estimated size
// of ongoing compactions exceeds the maximums allowed space usage.
// Returns true if the total size of SST and blob files as well as estimated
// size of ongoing compactions exceeds the maximums allowed space usage.
virtual bool IsMaxAllowedSpaceReachedIncludingCompactions() = 0;
// Return the total size of all tracked files.
@ -87,7 +86,7 @@ class SstFileManager {
};
// Create a new SstFileManager that can be shared among multiple RocksDB
// instances to track SST file and control there deletion rate.
// instances to track SST and blob files and control there deletion rate.
// Even though SstFileManager don't track WAL files but it still control
// there deletion rate.
//

Loading…
Cancel
Save