Add (Live)FileStorageInfo API (#8968)

Summary:
New classes FileStorageInfo and LiveFileStorageInfo and
'experimental' function DB::GetLiveFilesStorageInfo, which is intended
to largely replace several fragmented DB functions needed to create
checkpoints and backups.

This function is now used to create checkpoints and backups, because
it fixes many (probably not all) of the prior complexities of checkpoint
not having atomic access to DB metadata. This also ensures strong
functional test coverage of the new API. Specifically, much of the old
CheckpointImpl::CreateCustomCheckpoint has been migrated to and
updated in DBImpl::GetLiveFilesStorageInfo, with the former now
calling the latter.

Also, the class FileStorageInfo in metadata.h compatibly replaces
BackupFileInfo and serves as a new base class for SstFileMetaData.
Some old fields of SstFileMetaData are still provided (for now) but
deprecated.

Although FileStorageInfo::directory is accurate when using db_paths
and/or cf_paths, these have never been supported by Checkpoint
nor BackupEngine and still are not. This change does now detect
these cases and return NotSupported when appropriate. (More work
needed for support.)

Somehow this change broke ProgressCallbackDuringBackup, but
the progress_callback logic was dubious to begin with because it
would call the callback based on copy buffer size, not size actually
copied. Logic and test updated to track size actually copied
per-thread.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8968

Test Plan:
tests updated.
DB::GetLiveFilesStorageInfo mostly tested by use in CheckpointImpl.
DBTest.SnapshotFiles updated to also test GetLiveFilesStorageInfo,
including reading the data after DB close.
Added CheckpointTest.CheckpointWithDbPath (NotSupported).

Reviewed By: siying

Differential Revision: D31242045

Pulled By: pdillinger

fbshipit-source-id: b183d1ce9799e220daaefd6b3b5365d98de676c0
main
Peter Dillinger 3 years ago committed by Facebook GitHub Bot
parent 678ba5e41c
commit 3ffb3baa0b
  1. 2
      HISTORY.md
  2. 320
      db/db_filesnapshot.cc
  3. 9
      db/db_impl/db_impl.cc
  4. 6
      db/db_impl/db_impl.h
  5. 7
      db/db_impl/db_impl_open.cc
  6. 65
      db/db_test.cc
  7. 1
      db/version_set.cc
  8. 1
      db/version_set.h
  9. 35
      file/filename.cc
  10. 15
      file/filename.h
  11. 3
      include/rocksdb/data_structure.h
  12. 11
      include/rocksdb/db.h
  13. 169
      include/rocksdb/metadata.h
  14. 10
      include/rocksdb/options.h
  15. 14
      include/rocksdb/utilities/backup_engine.h
  16. 7
      include/rocksdb/utilities/checkpoint.h
  17. 6
      include/rocksdb/utilities/stackable_db.h
  18. 89
      utilities/backupable/backupable_db.cc
  19. 93
      utilities/backupable/backupable_db_test.cc
  20. 260
      utilities/checkpoint/checkpoint_impl.cc
  21. 1
      utilities/checkpoint/checkpoint_impl.h
  22. 13
      utilities/checkpoint/checkpoint_test.cc

@ -12,6 +12,7 @@
### New Features
* Print information about blob files when using "ldb list_live_files_metadata"
* Provided support for SingleDelete with user defined timestamp.
* Experimental new function DB::GetLiveFilesStorageInfo offers essentially a unified version of other functions like GetLiveFiles, GetLiveFilesChecksumInfo, and GetSortedWalFiles. Checkpoints and backups could show small behavioral changes and/or improved performance as they now use this new API.
* Add remote compaction read/write bytes statistics: `REMOTE_COMPACT_READ_BYTES`, `REMOTE_COMPACT_WRITE_BYTES`.
* Introduce an experimental feature to dump out the blocks from block cache and insert them to the secondary cache to reduce the cache warmup time (e.g., used while migrating DB instance). More information are in `class CacheDumper` and `CacheDumpedLoader` at `rocksdb/utilities/cache_dump_load.h` Note that, this feature is subject to the potential change in the future, it is still experimental.
* Introduced a new BlobDB configuration option `blob_garbage_collection_force_threshold`, which can be used to trigger compactions targeting the SST files which reference the oldest blob files when the ratio of garbage in those blob files meets or exceeds the specified threshold. This can reduce space amplification with skewed workloads where the affected SST files might not otherwise get picked up for compaction.
@ -20,6 +21,7 @@
* Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
* Made SliceTransform extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method. The Capped and Prefixed transform classes return a short name (no length); use GetId for the fully qualified name.
* Made FileChecksumGenFactory, SstPartitionerFactory, TablePropertiesCollectorFactory, and WalFilter extend the Customizable class and added a CreateFromString method.
* Some fields of SstFileMetaData are deprecated for compatibility with new base class FileStorageInfo.
* Add `file_temperature` to `IngestExternalFileArg` such that when ingesting SST files, we are able to indicate the temperature of the this batch of files.
## 6.25.0 (2021-09-20)

@ -6,11 +6,11 @@
#ifndef ROCKSDB_LITE
#include <stdint.h>
#include <algorithm>
#include <cinttypes>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "db/db_impl/db_impl.h"
#include "db/job_context.h"
@ -21,11 +21,52 @@
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/types.h"
#include "test_util/sync_point.h"
#include "util/file_checksum_helper.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
Status DBImpl::FlushForGetLiveFiles() {
mutex_.AssertHeld();
// flush all dirty data to disk.
Status status;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
status =
AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kGetLiveFiles);
if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
mutex_.Lock();
cfd->UnrefAndTryDelete();
if (!status.ok() && !status.IsColumnFamilyDropped()) {
break;
} else if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
return status;
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size,
bool flush_memtable) {
@ -34,39 +75,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
mutex_.Lock();
if (flush_memtable) {
// flush all dirty data to disk.
Status status;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
status = AtomicFlushMemTables(cfds, FlushOptions(),
FlushReason::kGetLiveFiles);
if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
mutex_.Lock();
cfd->UnrefAndTryDelete();
if (!status.ok() && !status.IsColumnFamilyDropped()) {
break;
} else if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
Status status = FlushForGetLiveFiles();
if (!status.ok()) {
mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
@ -157,6 +166,245 @@ Status DBImpl::GetCurrentWalFile(std::unique_ptr<LogFile>* current_log_file) {
return wal_manager_.GetLiveWalFile(current_logfile_number, current_log_file);
}
Status DBImpl::GetLiveFilesStorageInfo(
const LiveFilesStorageInfoOptions& opts,
std::vector<LiveFileStorageInfo>* files) {
// To avoid returning partial results, only move to ouput on success
assert(files);
files->clear();
std::vector<LiveFileStorageInfo> results;
// NOTE: This implementation was largely migrated from Checkpoint.
Status s;
VectorLogPtr live_wal_files;
bool flush_memtable = true;
if (!immutable_db_options_.allow_2pc) {
if (opts.wal_size_for_flush == port::kMaxUint64) {
flush_memtable = false;
} else if (opts.wal_size_for_flush > 0) {
// If out standing log files are small, we skip the flush.
s = GetSortedWalFiles(live_wal_files);
if (!s.ok()) {
return s;
}
// Don't flush column families if total log size is smaller than
// log_size_for_flush. We copy the log files instead.
// We may be able to cover 2PC case too.
uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) {
total_wal_size += wal->SizeFileBytes();
}
if (total_wal_size < opts.wal_size_for_flush) {
flush_memtable = false;
}
live_wal_files.clear();
}
}
// This is a modified version of GetLiveFiles, to get access to more
// metadata.
mutex_.Lock();
if (flush_memtable) {
Status status = FlushForGetLiveFiles();
if (!status.ok()) {
mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
status.ToString().c_str());
return status;
}
}
// Make a set of all of the live table and blob files
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
VersionStorageInfo& vsi = *cfd->current()->storage_info();
auto& cf_paths = cfd->ioptions()->cf_paths;
auto GetDir = [&](size_t path_id) {
// Matching TableFileName() behavior
if (path_id >= cf_paths.size()) {
assert(false);
return cf_paths.back().path;
} else {
return cf_paths[path_id].path;
}
};
for (int level = 0; level < vsi.num_levels(); ++level) {
const auto& level_files = vsi.LevelFiles(level);
for (const auto& meta : level_files) {
assert(meta);
results.emplace_back();
LiveFileStorageInfo& info = results.back();
info.relative_filename = MakeTableFileName(meta->fd.GetNumber());
info.directory = GetDir(meta->fd.GetPathId());
info.file_number = meta->fd.GetNumber();
info.file_type = kTableFile;
info.size = meta->fd.GetFileSize();
if (opts.include_checksum_info) {
info.file_checksum_func_name = meta->file_checksum_func_name;
info.file_checksum = meta->file_checksum;
if (info.file_checksum_func_name.empty()) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
}
}
info.temperature = meta->temperature;
}
}
const auto& blob_files = vsi.GetBlobFiles();
for (const auto& pair : blob_files) {
const auto& meta = pair.second;
assert(meta);
results.emplace_back();
LiveFileStorageInfo& info = results.back();
info.relative_filename = BlobFileName(meta->GetBlobFileNumber());
info.directory = GetName(); // TODO?: support db_paths/cf_paths
info.file_number = meta->GetBlobFileNumber();
info.file_type = kBlobFile;
info.size = meta->GetBlobFileSize();
if (opts.include_checksum_info) {
info.file_checksum_func_name = meta->GetChecksumMethod();
info.file_checksum = meta->GetChecksumValue();
if (info.file_checksum_func_name.empty()) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
}
}
// TODO?: info.temperature
}
}
// Capture some final info before releasing mutex
const uint64_t manifest_number = versions_->manifest_file_number();
const uint64_t manifest_size = versions_->manifest_file_size();
const uint64_t options_number = versions_->options_file_number();
const uint64_t options_size = versions_->options_file_size_;
const uint64_t min_log_num = MinLogNumberToKeep();
mutex_.Unlock();
std::string manifest_fname = DescriptorFileName(manifest_number);
{ // MANIFEST
results.emplace_back();
LiveFileStorageInfo& info = results.back();
info.relative_filename = manifest_fname;
info.directory = GetName();
info.file_number = manifest_number;
info.file_type = kDescriptorFile;
info.size = manifest_size;
info.trim_to_size = true;
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
}
}
{ // CURRENT
results.emplace_back();
LiveFileStorageInfo& info = results.back();
info.relative_filename = kCurrentFileName;
info.directory = GetName();
info.file_type = kCurrentFile;
// CURRENT could be replaced so we have to record the contents we want
// for it
info.replacement_contents = manifest_fname + "\n";
info.size = manifest_fname.size() + 1;
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
}
}
// The OPTIONS file number is zero in read-write mode when OPTIONS file
// writing failed and the DB was configured with
// `fail_if_options_file_error == false`. In read-only mode the OPTIONS file
// number is zero when no OPTIONS file exist at all. In those cases we do not
// record any OPTIONS file in the live file list.
if (options_number != 0) {
results.emplace_back();
LiveFileStorageInfo& info = results.back();
info.relative_filename = OptionsFileName(options_number);
info.directory = GetName();
info.file_number = options_number;
info.file_type = kOptionsFile;
info.size = options_size;
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
}
}
// Some legacy testing stuff TODO: carefully clean up obsolete parts
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:FlushDone");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
if (s.ok()) {
s = FlushWAL(false /* sync */);
}
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1");
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2");
// if we have more than one column family, we need to also get WAL files
if (s.ok()) {
s = GetSortedWalFiles(live_wal_files);
}
if (!s.ok()) {
return s;
}
size_t wal_size = live_wal_files.size();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Number of log files %" ROCKSDB_PRIszt, live_wal_files.size());
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
auto wal_dir = immutable_db_options_.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
results.emplace_back();
LiveFileStorageInfo& info = results.back();
auto f = live_wal_files[i]->PathName();
assert(!f.empty() && f[0] == '/');
info.relative_filename = f.substr(1);
info.directory = wal_dir;
info.file_number = live_wal_files[i]->LogNumber();
info.file_type = kWalFile;
info.size = live_wal_files[i]->SizeFileBytes();
// Only last should need to be trimmed
info.trim_to_size = (i + 1 == wal_size);
if (opts.include_checksum_info) {
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
info.file_checksum = kUnknownFileChecksum;
}
}
}
if (s.ok()) {
// Only move output on success
*files = std::move(results);
}
return s;
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -4284,11 +4284,16 @@ Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
uint64_t options_file_number = versions_->NewFileNumber();
std::string options_file_name =
OptionsFileName(GetName(), options_file_number);
// Retry if the file name happen to conflict with an existing one.
s = GetEnv()->RenameFile(file_name, options_file_name);
uint64_t options_file_size = 0;
s = GetEnv()->GetFileSize(file_name, &options_file_size);
if (s.ok()) {
// Retry if the file name happen to conflict with an existing one.
s = GetEnv()->RenameFile(file_name, options_file_name);
}
if (s.ok()) {
InstrumentedMutexLock l(&mutex_);
versions_->options_file_number_ = options_file_number;
versions_->options_file_size_ = options_file_size;
}
if (0 == disable_delete_obsolete_files_) {

@ -405,6 +405,10 @@ class DBImpl : public DB {
virtual Status GetLiveFilesChecksumInfo(
FileChecksumList* checksum_list) override;
virtual Status GetLiveFilesStorageInfo(
const LiveFilesStorageInfoOptions& opts,
std::vector<LiveFileStorageInfo>* files) override;
// Obtains the meta data of the specified column family of the DB.
// TODO(yhchiang): output parameter is placed in the end in this codebase.
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
@ -1231,6 +1235,8 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE
void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
Status FlushForGetLiveFiles();
#endif // !ROCKSDB_LITE
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;

@ -13,6 +13,7 @@
#include "db/error_handler.h"
#include "db/periodic_work_scheduler.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
@ -677,6 +678,12 @@ Status DBImpl::Recover(
}
}
versions_->options_file_number_ = options_file_number;
uint64_t options_file_size = 0;
if (options_file_number > 0) {
s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
&options_file_size);
}
versions_->options_file_size_ = options_file_size;
}
}
return s;

@ -11,11 +11,13 @@
// in Release build.
// which is a pity, it is a good test
#include <fcntl.h>
#include <algorithm>
#include <set>
#include <thread>
#include <unordered_set>
#include <utility>
#ifndef OS_WIN
#include <unistd.h>
#endif
@ -52,6 +54,7 @@
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
@ -2300,8 +2303,8 @@ TEST_F(DBTest, SnapshotFiles) {
uint64_t manifest_number = 0;
uint64_t manifest_size = 0;
std::vector<std::string> files;
dbfull()->DisableFileDeletions();
dbfull()->GetLiveFiles(files, &manifest_size);
ASSERT_OK(dbfull()->DisableFileDeletions());
ASSERT_OK(dbfull()->GetLiveFiles(files, &manifest_size));
// CURRENT, MANIFEST, OPTIONS, *.sst files (one for each CF)
ASSERT_EQ(files.size(), 5U);
@ -2330,18 +2333,17 @@ TEST_F(DBTest, SnapshotFiles) {
// latest manifest file
if (ParseFileName(files[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) {
if (number > manifest_number) {
manifest_number = number;
ASSERT_GE(size, manifest_size);
size = manifest_size; // copy only valid MANIFEST data
}
ASSERT_EQ(manifest_number, 0);
manifest_number = number;
ASSERT_GE(size, manifest_size);
size = manifest_size; // copy only valid MANIFEST data
}
}
CopyFile(src, dest, size);
}
// release file snapshot
dbfull()->DisableFileDeletions();
ASSERT_OK(dbfull()->EnableFileDeletions(/*force*/ false));
// overwrite one key, this key should not appear in the snapshot
std::vector<std::string> extras;
for (unsigned int i = 0; i < 1; i++) {
@ -2378,8 +2380,8 @@ TEST_F(DBTest, SnapshotFiles) {
uint64_t new_manifest_number = 0;
uint64_t new_manifest_size = 0;
std::vector<std::string> newfiles;
dbfull()->DisableFileDeletions();
dbfull()->GetLiveFiles(newfiles, &new_manifest_size);
ASSERT_OK(dbfull()->DisableFileDeletions());
ASSERT_OK(dbfull()->GetLiveFiles(newfiles, &new_manifest_size));
// find the new manifest file. assert that this manifest file is
// the same one as in the previous snapshot. But its size should be
@ -2391,20 +2393,41 @@ TEST_F(DBTest, SnapshotFiles) {
// latest manifest file
if (ParseFileName(newfiles[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) {
if (number > new_manifest_number) {
uint64_t size;
new_manifest_number = number;
ASSERT_OK(env_->GetFileSize(src, &size));
ASSERT_GE(size, new_manifest_size);
}
ASSERT_EQ(new_manifest_number, 0);
uint64_t size;
new_manifest_number = number;
ASSERT_OK(env_->GetFileSize(src, &size));
ASSERT_GE(size, new_manifest_size);
}
}
}
ASSERT_EQ(manifest_number, new_manifest_number);
ASSERT_GT(new_manifest_size, manifest_size);
// release file snapshot
dbfull()->DisableFileDeletions();
// Also test GetLiveFilesStorageInfo
std::vector<LiveFileStorageInfo> new_infos;
ASSERT_OK(dbfull()->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(),
&new_infos));
// Close DB (while deletions disabled)
Close();
// Validate
for (auto& info : new_infos) {
std::string path = info.directory + "/" + info.relative_filename;
uint64_t size;
ASSERT_OK(env_->GetFileSize(path, &size));
if (info.trim_to_size) {
ASSERT_LE(info.size, size);
} else if (!info.replacement_contents.empty()) {
ASSERT_EQ(info.size, info.replacement_contents.size());
} else {
ASSERT_EQ(info.size, size);
}
if (info.file_type == kDescriptorFile) {
ASSERT_EQ(info.file_number, manifest_number);
}
}
} while (ChangeCompactOptions());
}
@ -3119,6 +3142,12 @@ class ModelDB : public DB {
return Status::OK();
}
Status GetLiveFilesStorageInfo(
const LiveFilesStorageInfoOptions& /*opts*/,
std::vector<LiveFileStorageInfo>* /*files*/) override {
return Status::OK();
}
Status GetSortedWalFiles(VectorLogPtr& /*files*/) override {
return Status::OK();
}

@ -4059,6 +4059,7 @@ VersionSet::VersionSet(const std::string& dbname,
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
options_file_number_(0),
options_file_size_(0),
pending_manifest_file_number_(0),
last_sequence_(0),
last_allocated_sequence_(0),

@ -1374,6 +1374,7 @@ class VersionSet {
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
uint64_t manifest_file_number_;
uint64_t options_file_number_;
uint64_t options_file_size_;
uint64_t pending_manifest_file_number_;
// The last seq visible to reads. It normally indicates the last sequence in
// the memtable but when using two write queues it could also indicate the

@ -20,9 +20,14 @@
namespace ROCKSDB_NAMESPACE {
const std::string kCurrentFileName = "CURRENT";
const std::string kOptionsFileNamePrefix = "OPTIONS-";
const std::string kTempFileNameSuffix = "dbtmp";
static const std::string kRocksDbTFileExt = "sst";
static const std::string kLevelDbTFileExt = "ldb";
static const std::string kRocksDBBlobFileExt = "blob";
static const std::string kArchivalDirName = "archive";
// Given a path, flatten the path name by replacing all chars not in
// {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end.
@ -96,11 +101,11 @@ std::string BlobFileName(const std::string& dbname, const std::string& blob_dir,
}
std::string ArchivalDirectory(const std::string& dir) {
return dir + "/" + ARCHIVAL_DIR;
return dir + "/" + kArchivalDirName;
}
std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
assert(number > 0);
return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log");
return MakeFileName(name + "/" + kArchivalDirName, number, "log");
}
std::string MakeTableFileName(const std::string& path, uint64_t number) {
@ -155,16 +160,20 @@ void FormatFileNumber(uint64_t number, uint32_t path_id, char* out_buf,
}
}
std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
std::string DescriptorFileName(uint64_t number) {
assert(number > 0);
char buf[100];
snprintf(buf, sizeof(buf), "/MANIFEST-%06llu",
snprintf(buf, sizeof(buf), "MANIFEST-%06llu",
static_cast<unsigned long long>(number));
return dbname + buf;
return buf;
}
std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
return dbname + "/" + DescriptorFileName(number);
}
std::string CurrentFileName(const std::string& dbname) {
return dbname + "/CURRENT";
return dbname + "/" + kCurrentFileName;
}
std::string LockFileName(const std::string& dbname) {
@ -213,11 +222,14 @@ std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts,
return log_dir + "/" + info_log_prefix.buf + ".old." + buf;
}
std::string OptionsFileName(const std::string& dbname, uint64_t file_num) {
std::string OptionsFileName(uint64_t file_num) {
char buffer[256];
snprintf(buffer, sizeof(buffer), "%s%06" PRIu64,
kOptionsFileNamePrefix.c_str(), file_num);
return dbname + "/" + buffer;
return buffer;
}
std::string OptionsFileName(const std::string& dbname, uint64_t file_num) {
return dbname + "/" + OptionsFileName(file_num);
}
std::string TempOptionsFileName(const std::string& dbname, uint64_t file_num) {
@ -331,11 +343,12 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
// Avoid strtoull() to keep filename format independent of the
// current locale
bool archive_dir_found = false;
if (rest.starts_with(ARCHIVAL_DIR)) {
if (rest.size() <= ARCHIVAL_DIR.size()) {
if (rest.starts_with(kArchivalDirName)) {
if (rest.size() <= kArchivalDirName.size()) {
return false;
}
rest.remove_prefix(ARCHIVAL_DIR.size() + 1); // Add 1 to remove / also
rest.remove_prefix(kArchivalDirName.size() +
1); // Add 1 to remove / also
if (log_type) {
*log_type = kArchivedLogFile;
}

@ -31,9 +31,9 @@ class SystemClock;
class WritableFileWriter;
#ifdef OS_WIN
const char kFilePathSeparator = '\\';
constexpr char kFilePathSeparator = '\\';
#else
const char kFilePathSeparator = '/';
constexpr char kFilePathSeparator = '/';
#endif
// Return the name of the log file with the specified number
@ -50,8 +50,6 @@ extern std::string BlobFileName(const std::string& bdirname, uint64_t number);
extern std::string BlobFileName(const std::string& dbname,
const std::string& blob_dir, uint64_t number);
static const std::string ARCHIVAL_DIR = "archive";
extern std::string ArchivalDirectory(const std::string& dbname);
// Return the name of the archived log file with the specified number
@ -89,6 +87,10 @@ extern void FormatFileNumber(uint64_t number, uint32_t path_id, char* out_buf,
extern std::string DescriptorFileName(const std::string& dbname,
uint64_t number);
extern std::string DescriptorFileName(uint64_t number);
extern const std::string kCurrentFileName; // = "CURRENT"
// Return the name of the current file. This file contains the name
// of the current manifest file. The result will be prefixed with
// "dbname".
@ -122,13 +124,14 @@ extern std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts,
const std::string& db_path = "",
const std::string& log_dir = "");
static const std::string kOptionsFileNamePrefix = "OPTIONS-";
static const std::string kTempFileNameSuffix = "dbtmp";
extern const std::string kOptionsFileNamePrefix; // = "OPTIONS-"
extern const std::string kTempFileNameSuffix; // = "dbtmp"
// Return a options file name given the "dbname" and file number.
// Format: OPTIONS-[number].dbtmp
extern std::string OptionsFileName(const std::string& dbname,
uint64_t file_num);
extern std::string OptionsFileName(uint64_t file_num);
// Return a temp options file name given the "dbname" and file number.
// Format: OPTIONS-[number]

@ -7,7 +7,9 @@
#include <assert.h>
#include <cstddef>
#include <cstdint>
#include <vector>
#include "rocksdb/rocksdb_namespace.h"
@ -45,4 +47,5 @@ class SmallEnumSet {
private:
uint64_t state_;
};
} // namespace ROCKSDB_NAMESPACE

@ -1450,6 +1450,15 @@ class DB {
// synchronized with GetLiveFiles.
virtual Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) = 0;
// EXPERIMENTAL: This function is not yet feature-complete.
// Get information about all live files that make up a DB, for making
// live copies (Checkpoint, backups, etc.) or other storage-related purposes.
// Use DisableFileDeletions() before and EnableFileDeletions() after to
// preserve the files for live copy.
virtual Status GetLiveFilesStorageInfo(
const LiveFilesStorageInfoOptions& opts,
std::vector<LiveFileStorageInfo>* files) = 0;
// Obtains the meta data of the specified column family of the DB.
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* /*column_family*/,
ColumnFamilyMetaData* /*metadata*/) {}
@ -1516,7 +1525,7 @@ class DB {
// this column family.
// (1) External SST files can be created using SstFileWriter.
// (2) External SST files can be exported from a particular column family in
// an existing DB.
// an existing DB using Checkpoint::ExportColumnFamily.
// Option in import_options specifies whether the external files are copied or
// moved (default is copy). When option specifies copy, managing files at
// external_file_path is caller's responsibility. When option specifies a

@ -5,9 +5,9 @@
#pragma once
#include <stdint.h>
#include <cstdint>
#include <limits>
#include <memory>
#include <string>
#include <vector>
@ -16,24 +16,66 @@
namespace ROCKSDB_NAMESPACE {
// The metadata that describes a SST file.
struct SstFileMetaData {
SstFileMetaData()
: size(0),
file_number(0),
smallest_seqno(0),
largest_seqno(0),
num_reads_sampled(0),
being_compacted(false),
num_entries(0),
num_deletions(0),
temperature(Temperature::kUnknown),
oldest_blob_file_number(0),
oldest_ancester_time(0),
file_creation_time(0) {}
// Basic identifiers and metadata for a file in a DB. This only includes
// information considered relevant for taking backups, checkpoints, or other
// services relating to DB file storage.
// This is only appropriate for immutable files, such as SST files or all
// files in a backup. See also LiveFileStorageInfo.
struct FileStorageInfo {
// The name of the file within its directory (e.g. "123456.sst")
std::string relative_filename;
// The directory containing the file, without a trailing '/'. This could be
// a DB path, wal_dir, etc.
std::string directory;
// The id of the file within a single DB. Set to 0 if the file does not have
// a number (e.g. CURRENT)
uint64_t file_number = 0;
// The type of the file as part of a DB.
FileType file_type = kTempFile;
// File size in bytes. See also `trim_to_size`.
uint64_t size = 0;
// This feature is experimental and subject to change.
Temperature temperature = Temperature::kUnknown;
// The checksum of a SST file, the value is decided by the file content and
// the checksum algorithm used for this SST file. The checksum function is
// identified by the file_checksum_func_name. If the checksum function is
// not specified, file_checksum is "0" by default.
std::string file_checksum;
// The name of the checksum function used to generate the file checksum
// value. If file checksum is not enabled (e.g., sst_file_checksum_func is
// null), file_checksum_func_name is UnknownFileChecksumFuncName, which is
// "Unknown".
std::string file_checksum_func_name;
};
// Adds to FileStorageInfo the ability to capture the state of files that
// might change in a running DB.
struct LiveFileStorageInfo : public FileStorageInfo {
// If non-empty, this string represents the "saved" contents of the file
// for the current context. (This field is used for checkpointing CURRENT
// file.) In that case, size == replacement_contents.size() and file on disk
// should be ignored. If empty string, the file on disk should still have
// "saved" contents. (See trim_to_size.)
std::string replacement_contents;
// If true, the file on disk is allowed to be larger than `size` but only
// the first `size` bytes should be used for the current context. If false,
// the file is corrupt if size on disk does not equal `size`.
bool trim_to_size = false;
};
// The metadata that describes an SST file. (Does not need to extend
// LiveFileStorageInfo because SST files are always immutable.)
struct SstFileMetaData : public FileStorageInfo {
SstFileMetaData() {}
SstFileMetaData(const std::string& _file_name, uint64_t _file_number,
const std::string& _path, size_t _size,
const std::string& _directory, size_t _size,
SequenceNumber _smallest_seqno, SequenceNumber _largest_seqno,
const std::string& _smallestkey,
const std::string& _largestkey, uint64_t _num_reads_sampled,
@ -42,11 +84,7 @@ struct SstFileMetaData {
uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
std::string& _file_checksum,
std::string& _file_checksum_func_name)
: size(_size),
name(_file_name),
file_number(_file_number),
db_path(_path),
smallest_seqno(_smallest_seqno),
: smallest_seqno(_smallest_seqno),
largest_seqno(_largest_seqno),
smallestkey(_smallestkey),
largestkey(_largestkey),
@ -54,37 +92,44 @@ struct SstFileMetaData {
being_compacted(_being_compacted),
num_entries(0),
num_deletions(0),
temperature(_temperature),
oldest_blob_file_number(_oldest_blob_file_number),
oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time),
file_checksum(_file_checksum),
file_checksum_func_name(_file_checksum_func_name) {}
// File size in bytes.
uint64_t size;
// The name of the file.
std::string name;
// The id of the file.
uint64_t file_number;
// The full path where the file locates.
std::string db_path;
SequenceNumber smallest_seqno; // Smallest sequence number in file.
SequenceNumber largest_seqno; // Largest sequence number in file.
std::string smallestkey; // Smallest user defined key in the file.
std::string largestkey; // Largest user defined key in the file.
uint64_t num_reads_sampled; // How many times the file is read.
bool being_compacted; // true if the file is currently being compacted.
uint64_t num_entries;
uint64_t num_deletions;
// This feature is experimental and subject to change.
Temperature temperature;
uint64_t oldest_blob_file_number; // The id of the oldest blob file
// referenced by the file.
file_creation_time(_file_creation_time) {
if (!_file_name.empty()) {
if (_file_name[0] == '/') {
relative_filename = _file_name.substr(1);
name = _file_name; // Deprecated field
} else {
relative_filename = _file_name;
name = std::string("/") + _file_name; // Deprecated field
}
assert(relative_filename.size() + 1 == name.size());
assert(relative_filename[0] != '/');
assert(name[0] == '/');
}
directory = _directory;
db_path = _directory; // Deprecated field
file_number = _file_number;
file_type = kTableFile;
size = _size;
temperature = _temperature;
file_checksum = _file_checksum;
file_checksum_func_name = _file_checksum_func_name;
}
SequenceNumber smallest_seqno = 0; // Smallest sequence number in file.
SequenceNumber largest_seqno = 0; // Largest sequence number in file.
std::string smallestkey; // Smallest user defined key in the file.
std::string largestkey; // Largest user defined key in the file.
uint64_t num_reads_sampled = 0; // How many times the file is read.
bool being_compacted =
false; // true if the file is currently being compacted.
uint64_t num_entries = 0;
uint64_t num_deletions = 0;
uint64_t oldest_blob_file_number = 0; // The id of the oldest blob file
// referenced by the file.
// An SST file may be generated by compactions whose input files may
// in turn be generated by earlier compactions. The creation time of the
// oldest SST file that is the compaction ancestor of this file.
@ -92,22 +137,18 @@ struct SstFileMetaData {
// 0 if the information is not available.
//
// Note: for TTL blob files, it contains the start of the expiration range.
uint64_t oldest_ancester_time;
uint64_t oldest_ancester_time = 0;
// Timestamp when the SST file is created, provided by
// SystemClock::GetCurrentTime(). 0 if the information is not available.
uint64_t file_creation_time;
uint64_t file_creation_time = 0;
// The checksum of a SST file, the value is decided by the file content and
// the checksum algorithm used for this SST file. The checksum function is
// identified by the file_checksum_func_name. If the checksum function is
// not specified, file_checksum is "0" by default.
std::string file_checksum;
// DEPRECATED: The name of the file within its directory with a
// leading slash (e.g. "/123456.sst"). Use relative_filename from base struct
// instead.
std::string name;
// The name of the checksum function used to generate the file checksum
// value. If file checksum is not enabled (e.g., sst_file_checksum_func is
// null), file_checksum_func_name is UnknownFileChecksumFuncName, which is
// "Unknown".
std::string file_checksum_func_name;
// DEPRECATED: replaced by `directory` in base struct
std::string db_path;
};
// The full set of metadata associated with each SST file.

@ -1884,4 +1884,14 @@ struct CompactionServiceOptionsOverride {
std::shared_ptr<Statistics> statistics = nullptr;
};
#ifndef ROCKSDB_LITE
struct LiveFilesStorageInfoOptions {
// Whether to populate FileStorageInfo::file_checksum* or leave blank
bool include_checksum_info = false;
// Flushes memtables if total size in bytes of live WAL files is >= this
// number. Default: always force a flush without checking sizes.
uint64_t wal_size_for_flush = 0;
};
#endif // !ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

@ -18,6 +18,7 @@
#include "rocksdb/env.h"
#include "rocksdb/io_status.h"
#include "rocksdb/metadata.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
@ -284,16 +285,10 @@ struct RestoreOptions {
: keep_log_files(_keep_log_files) {}
};
struct BackupFileInfo {
// File name and path relative to the backup_dir directory.
std::string relative_filename;
// Size of the file in bytes, not including filesystem overheads.
uint64_t size;
};
using BackupID = uint32_t;
using BackupFileInfo = FileStorageInfo;
struct BackupInfo {
BackupID backup_id = 0U;
// Creation time, according to GetCurrentTime
@ -475,6 +470,9 @@ class BackupEngineAppendOnlyBase {
// Captures the state of the database by creating a new (latest) backup.
// On success (OK status), the BackupID of the new backup is saved to
// *new_backup_id when not nullptr.
// NOTE: db_paths and cf_paths are not supported for creating backups,
// and NotSupported will be returned when the DB (without WALs) uses more
// than one directory.
virtual IOStatus CreateNewBackup(const CreateBackupOptions& options, DB* db,
BackupID* new_backup_id = nullptr) {
return CreateNewBackupWithMetadata(options, db, "", new_backup_id);

@ -38,8 +38,11 @@ class Checkpoint {
// if WAL writing is not always enabled.
// Flush will always trigger if it is 2PC.
// sequence_number_ptr: if it is not nullptr, the value it points to will be
// set to the DB's sequence number. The default value of this parameter is
// nullptr.
// set to a sequence number guaranteed to be part of the DB, not necessarily
// the latest. The default value of this parameter is nullptr.
// NOTE: db_paths and cf_paths are not supported for creating checkpoints
// and NotSupported will be returned when the DB (without WALs) uses more
// than one directory.
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush = 0,
uint64_t* sequence_number_ptr = nullptr);

@ -357,6 +357,12 @@ class StackableDB : public DB {
return db_->GetLiveFilesChecksumInfo(checksum_list);
}
virtual Status GetLiveFilesStorageInfo(
const LiveFilesStorageInfoOptions& opts,
std::vector<LiveFileStorageInfo>* files) override {
return db_->GetLiveFilesStorageInfo(opts, files);
}
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* cf_meta) override {
db_->GetColumnFamilyMetaData(column_family, cf_meta);

@ -537,13 +537,14 @@ class BackupEngineImpl {
//
// @param src If non-empty, the file is copied from this pathname.
// @param contents If non-empty, the file will be created with these contents.
IOStatus CopyOrCreateFile(
const std::string& src, const std::string& dst,
const std::string& contents, Env* src_env, Env* dst_env,
const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr, std::string* checksum_hex = nullptr,
uint64_t size_limit = 0,
std::function<void()> progress_callback = []() {});
IOStatus CopyOrCreateFile(const std::string& src, const std::string& dst,
const std::string& contents, uint64_t size_limit,
Env* src_env, Env* dst_env,
const EnvOptions& src_env_options, bool sync,
RateLimiter* rate_limiter,
std::function<void()> progress_callback,
uint64_t* bytes_toward_next_callback,
uint64_t* size, std::string* checksum_hex);
IOStatus ReadFileAndComputeChecksum(const std::string& src,
const std::shared_ptr<FileSystem>& src_fs,
@ -1163,6 +1164,7 @@ IOStatus BackupEngineImpl::Initialize() {
#endif
CpuPriority current_priority = CpuPriority::kNormal;
CopyOrCreateWorkItem work_item;
uint64_t bytes_toward_next_callback = 0;
while (files_to_copy_or_create_.read(work_item)) {
CpuPriority priority = threads_cpu_priority_;
if (current_priority != priority) {
@ -1181,10 +1183,10 @@ IOStatus BackupEngineImpl::Initialize() {
CopyOrCreateResult result;
result.io_status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.src_env, work_item.dst_env, work_item.src_env_options,
work_item.sync, work_item.rate_limiter, &result.size,
&result.checksum_hex, work_item.size_limit,
work_item.progress_callback);
work_item.size_limit, work_item.src_env, work_item.dst_env,
work_item.src_env_options, work_item.sync, work_item.rate_limiter,
work_item.progress_callback, &bytes_toward_next_callback,
&result.size, &result.checksum_hex);
RecordTick(work_item.stats, BACKUP_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
@ -1324,7 +1326,6 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
EnvOptions src_raw_env_options(db_options);
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint(
db_options,
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
FileType) {
// custom checkpoint will switch to calling copy_file_cb after it sees
@ -1342,7 +1343,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
uint64_t size_bytes = 0;
IOStatus io_st;
if (type == kTableFile || type == kBlobFile) {
io_st = db_fs_->GetFileSize(src_dirname + fname, io_options_,
io_st = db_fs_->GetFileSize(src_dirname + "/" + fname, io_options_,
&size_bytes, nullptr);
}
EnvOptions src_env_options;
@ -1612,6 +1613,8 @@ void BackupEngineImpl::SetBackupInfoFromBackupMeta(
bool include_file_details) const {
*backup_info = BackupInfo(id, meta.GetTimestamp(), meta.GetSize(),
meta.GetNumberFiles(), meta.GetAppMetadata());
std::string dir = options_.backup_dir + "/" + kPrivateDirSlash +
ROCKSDB_NAMESPACE::ToString(id);
if (include_file_details) {
auto& file_details = backup_info->file_details;
file_details.reserve(meta.GetFiles().size());
@ -1619,6 +1622,15 @@ void BackupEngineImpl::SetBackupInfoFromBackupMeta(
BackupFileInfo& finfo = *file_details.emplace(file_details.end());
finfo.relative_filename = file_ptr->filename;
finfo.size = file_ptr->size;
finfo.directory = dir;
uint64_t number;
FileType type;
bool ok = ParseFileName(file_ptr->filename, &number, &type);
if (ok) {
finfo.file_number = number;
finfo.file_type = type;
}
// TODO: temperature, file_checksum, file_checksum_func_name
}
backup_info->name_for_open = GetAbsolutePath(GetPrivateFileRel(id));
backup_info->name_for_open.pop_back(); // remove trailing '/'
@ -1923,9 +1935,11 @@ IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id,
IOStatus BackupEngineImpl::CopyOrCreateFile(
const std::string& src, const std::string& dst, const std::string& contents,
Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
RateLimiter* rate_limiter, uint64_t* size, std::string* checksum_hex,
uint64_t size_limit, std::function<void()> progress_callback) {
uint64_t size_limit, Env* src_env, Env* dst_env,
const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
std::function<void()> progress_callback,
uint64_t* bytes_toward_next_callback, uint64_t* size,
std::string* checksum_hex) {
assert(src.empty() != contents.empty());
IOStatus io_s;
std::unique_ptr<FSWritableFile> dst_file;
@ -1967,7 +1981,6 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
}
Slice data;
uint64_t processed_buffer_size = 0;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return status_to_io_status(Status::Incomplete("Backup stopped"));
@ -1980,7 +1993,7 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
processed_buffer_size += buffer_to_read;
*bytes_toward_next_callback += data.size();
} else {
data = contents;
}
@ -2005,8 +2018,9 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kWrite);
}
if (processed_buffer_size > options_.callback_trigger_interval_size) {
processed_buffer_size -= options_.callback_trigger_interval_size;
while (*bytes_toward_next_callback >=
options_.callback_trigger_interval_size) {
*bytes_toward_next_callback -= options_.callback_trigger_interval_size;
std::lock_guard<std::mutex> lock(byte_report_mutex_);
progress_callback();
}
@ -2037,10 +2051,10 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
std::function<void()> progress_callback, const std::string& contents,
const std::string& src_checksum_func_name,
const std::string& src_checksum_str) {
assert(!fname.empty() && fname[0] == '/');
assert(contents.empty() != src_dir.empty());
std::string dst_relative = fname.substr(1);
std::string src_path = src_dir + "/" + fname;
std::string dst_relative;
std::string dst_relative_tmp;
std::string db_id;
std::string db_session_id;
@ -2073,8 +2087,8 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// Prepare db_session_id to add to the file name
// Ignore the returned status
// In the failed cases, db_id and db_session_id will be empty
GetFileDbIdentities(db_env_, src_env_options, src_dir + fname,
rate_limiter, &db_id, &db_session_id)
GetFileDbIdentities(db_env_, src_env_options, src_path, rate_limiter,
&db_id, &db_session_id)
.PermitUncheckedError();
}
// Calculate checksum if checksum and db session id are not available.
@ -2083,13 +2097,13 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// the shared_checksum directory.
if (checksum_hex.empty() && db_session_id.empty()) {
IOStatus io_s = ReadFileAndComputeChecksum(
src_dir + fname, db_fs_, src_env_options, size_limit, &checksum_hex);
src_path, db_fs_, src_env_options, size_limit, &checksum_hex);
if (!io_s.ok()) {
return io_s;
}
}
if (size_bytes == port::kMaxUint64) {
return IOStatus::NotFound("File missing: " + src_dir + fname);
return IOStatus::NotFound("File missing: " + src_path);
}
// dst_relative depends on the following conditions:
// 1) the naming scheme is kUseDbSessionId,
@ -2106,15 +2120,15 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// It uses original/legacy naming scheme.
// dst_relative will be of the form:
// shared_checksum/<file_number>_<checksum>_<size>.blob
dst_relative = GetSharedFileWithChecksum(dst_relative, checksum_hex,
size_bytes, db_session_id);
dst_relative = GetSharedFileWithChecksum(fname, checksum_hex, size_bytes,
db_session_id);
dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
} else if (shared) {
dst_relative_tmp = GetSharedFileRel(dst_relative, true);
dst_relative = GetSharedFileRel(dst_relative, false);
dst_relative_tmp = GetSharedFileRel(fname, true);
dst_relative = GetSharedFileRel(fname, false);
} else {
dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
dst_relative = GetPrivateFileRel(backup_id, false, fname);
}
// We copy into `temp_dest_path` and, once finished, rename it to
@ -2201,9 +2215,8 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// ID, but even in that case, we double check the file sizes in
// BackupMeta::AddFile.
} else {
IOStatus io_s = ReadFileAndComputeChecksum(src_dir + fname, db_fs_,
src_env_options,
size_limit, &checksum_hex);
IOStatus io_s = ReadFileAndComputeChecksum(
src_path, db_fs_, src_env_options, size_limit, &checksum_hex);
if (!io_s.ok()) {
return io_s;
}
@ -2229,10 +2242,10 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
copy_dest_path->c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
size_limit, stats, progress_callback, src_checksum_func_name,
checksum_hex, db_id, db_session_id);
src_dir.empty() ? "" : src_path, *copy_dest_path, contents, db_env_,
backup_env_, src_env_options, options_.sync, rate_limiter, size_limit,
stats, progress_callback, src_checksum_func_name, checksum_hex, db_id,
db_session_id);
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);

@ -25,6 +25,7 @@
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
@ -71,16 +72,6 @@ class DummyDB : public StackableDB {
DBOptions GetDBOptions() const override { return DBOptions(options_); }
using StackableDB::GetIntProperty;
bool GetIntProperty(ColumnFamilyHandle*, const Slice& property,
uint64_t* value) override {
if (property == DB::Properties::kMinLogNumberToKeep) {
*value = 1;
return true;
}
return false;
}
Status EnableFileDeletions(bool /*force*/) override {
EXPECT_TRUE(!deletions_enabled_);
deletions_enabled_ = true;
@ -93,14 +84,6 @@ class DummyDB : public StackableDB {
return Status::OK();
}
Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
bool /*flush_memtable*/ = true) override {
EXPECT_TRUE(!deletions_enabled_);
vec = live_files_;
*mfs = 100;
return Status::OK();
}
ColumnFamilyHandle* DefaultColumnFamily() const override { return nullptr; }
class DummyLogFile : public LogFile {
@ -134,12 +117,36 @@ class DummyDB : public StackableDB {
bool alive_;
}; // DummyLogFile
Status GetSortedWalFiles(VectorLogPtr& files) override {
EXPECT_TRUE(!deletions_enabled_);
files.resize(wal_files_.size());
for (size_t i = 0; i < files.size(); ++i) {
files[i].reset(
new DummyLogFile(wal_files_[i].first, wal_files_[i].second));
Status GetLiveFilesStorageInfo(
const LiveFilesStorageInfoOptions& opts,
std::vector<LiveFileStorageInfo>* files) override {
uint64_t number;
FileType type;
files->clear();
for (auto& f : live_files_) {
bool success = ParseFileName(f, &number, &type);
if (!success) {
return Status::InvalidArgument("Bad file name: " + f);
}
files->emplace_back();
LiveFileStorageInfo& info = files->back();
info.relative_filename = f;
info.directory = dbname_;
info.file_number = number;
info.file_type = type;
if (type == kDescriptorFile) {
info.size = 100; // See TestEnv::GetChildrenFileAttributes below
info.trim_to_size = true;
} else if (type == kCurrentFile) {
info.size = 0;
info.trim_to_size = true;
} else {
info.size = 200; // See TestEnv::GetChildrenFileAttributes below
}
if (opts.include_checksum_info) {
info.file_checksum = kUnknownFileChecksum;
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
}
}
return Status::OK();
}
@ -148,8 +155,7 @@ class DummyDB : public StackableDB {
Status FlushWAL(bool /*sync*/) override { return Status::OK(); }
std::vector<std::string> live_files_;
// pair<filename, alive?>
std::vector<std::pair<std::string, bool>> wal_files_;
private:
Options options_;
std::string dbname_;
@ -319,7 +325,7 @@ class TestEnv : public EnvWrapper {
if (filename.find("MANIFEST") == 0) {
size_bytes = 100; // Match DummyDB::GetLiveFiles
}
r->push_back({dir + filename, size_bytes});
r->push_back({dir + "/" + filename, size_bytes});
}
return Status::OK();
}
@ -327,7 +333,7 @@ class TestEnv : public EnvWrapper {
}
Status GetFileSize(const std::string& path, uint64_t* size_bytes) override {
if (filenames_for_mocked_attrs_.size() > 0) {
auto fname = path.substr(path.find_last_of('/'));
auto fname = path.substr(path.find_last_of('/') + 1);
auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(),
filenames_for_mocked_attrs_.end(), fname);
if (filename_iter != filenames_for_mocked_attrs_.end()) {
@ -1125,6 +1131,11 @@ TEST_P(BackupEngineTestWithParam, OnlineIntegrationTest) {
// delete old data
DestroyDB(dbname_, options_);
// TODO: Implement & test db_paths support in backup (not supported in
// restore)
// options_.db_paths.emplace_back(dbname_, 500 * 1024);
// options_.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
OpenDBAndBackupEngine(true);
// write some data, backup, repeat
for (int i = 0; i < 5; ++i) {
@ -1191,9 +1202,8 @@ TEST_F(BackupEngineTest, NoDoubleCopy_And_AutoGC) {
test_backup_env_->SetLimitWrittenFiles(7);
test_backup_env_->ClearWrittenFiles();
test_db_env_->SetLimitWrittenFiles(0);
dummy_db_->live_files_ = {"/00010.sst", "/00011.sst", "/CURRENT",
"/MANIFEST-01"};
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
dummy_db_->live_files_ = {"00010.sst", "00011.sst", "CURRENT", "MANIFEST-01",
"00011.log"};
test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
std::vector<std::string> should_have_written = {
@ -1210,9 +1220,8 @@ TEST_F(BackupEngineTest, NoDoubleCopy_And_AutoGC) {
test_backup_env_->SetLimitWrittenFiles(6);
test_backup_env_->ClearWrittenFiles();
dummy_db_->live_files_ = {"/00010.sst", "/" + other_sst, "/CURRENT",
"/MANIFEST-01"};
dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}};
dummy_db_->live_files_ = {"00010.sst", other_sst, "CURRENT", "MANIFEST-01",
"00011.log"};
test_db_env_->SetFilenamesForMockedAttrs(dummy_db_->live_files_);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
// should not open 00010.sst - it's already there
@ -1411,8 +1420,10 @@ TEST_F(BackupEngineTest, CorruptFileMaintainSize) {
// under normal circumstance, there should be at least one nonempty file
while (file_size == 0) {
// get a random file in /private/1
ASSERT_OK(file_manager_->GetRandomFileInDir(backupdir_ + "/private/1",
&file_to_corrupt, &file_size));
assert(file_manager_
->GetRandomFileInDir(backupdir_ + "/private/1", &file_to_corrupt,
&file_size)
.ok());
// corrupt the file by replacing its content by file_size random bytes
ASSERT_OK(file_manager_->CorruptFile(file_to_corrupt, file_size));
}
@ -2894,13 +2905,23 @@ TEST_F(BackupEngineTest, OpenBackupAsReadOnlyDB) {
TEST_F(BackupEngineTest, ProgressCallbackDuringBackup) {
DestroyDB(dbname_, options_);
// Too big for this small DB
backupable_options_->callback_trigger_interval_size = 100000;
OpenDBAndBackupEngine(true);
FillDB(db_.get(), 0, 100);
bool is_callback_invoked = false;
ASSERT_OK(backup_engine_->CreateNewBackup(
db_.get(), true,
[&is_callback_invoked]() { is_callback_invoked = true; }));
ASSERT_FALSE(is_callback_invoked);
CloseBackupEngine();
// Easily small enough for this small DB
backupable_options_->callback_trigger_interval_size = 1000;
OpenBackupEngine();
ASSERT_OK(backup_engine_->CreateNewBackup(
db_.get(), true,
[&is_callback_invoked]() { is_callback_invoked = true; }));
ASSERT_TRUE(is_callback_invoked);
CloseDBAndBackupEngine();
DestroyDB(dbname_, options_);

@ -15,6 +15,7 @@
#include <cinttypes>
#include <string>
#include <tuple>
#include <unordered_set>
#include <vector>
#include "db/wal_manager.h"
@ -25,7 +26,9 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/options.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/checkpoint.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
@ -118,28 +121,28 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
if (s.ok() || s.IsNotSupported()) {
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
fname.c_str());
return db_->GetFileSystem()->LinkFile(src_dirname + fname,
full_private_path + fname,
IOOptions(), nullptr);
return db_->GetFileSystem()->LinkFile(
src_dirname + "/" + fname, full_private_path + "/" + fname,
IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
return CopyFile(db_->GetFileSystem(), src_dirname + "/" + fname,
full_private_path + "/" + fname, size_limit_bytes,
db_options.use_fsync);
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
return CreateFile(db_->GetFileSystem(), full_private_path + fname,
contents, db_options.use_fsync);
return CreateFile(db_->GetFileSystem(),
full_private_path + "/" + fname, contents,
db_options.use_fsync);
} /* create_file_cb */,
&sequence_number, log_size_for_flush);
@ -182,7 +185,6 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
}
Status CheckpointImpl::CreateCustomCheckpoint(
const DBOptions& db_options,
std::function<Status(const std::string& src_dirname,
const std::string& src_fname, FileType type)>
link_file_cb,
@ -196,214 +198,80 @@ Status CheckpointImpl::CreateCustomCheckpoint(
create_file_cb,
uint64_t* sequence_number, uint64_t log_size_for_flush,
bool get_live_table_checksum) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
uint64_t min_log_num = port::kMaxUint64;
*sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
VectorLogPtr live_wal_files;
bool flush_memtable = true;
if (!db_options.allow_2pc) {
if (log_size_for_flush == port::kMaxUint64) {
flush_memtable = false;
} else if (log_size_for_flush > 0) {
// If out standing log files are small, we skip the flush.
s = db_->GetSortedWalFiles(live_wal_files);
if (!s.ok()) {
return s;
}
// Don't flush column families if total log size is smaller than
// log_size_for_flush. We copy the log files instead.
// We may be able to cover 2PC case too.
uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) {
total_wal_size += wal->SizeFileBytes();
}
if (total_wal_size < log_size_for_flush) {
flush_memtable = false;
}
live_wal_files.clear();
}
}
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, &min_log_num)) {
return Status::InvalidArgument("cannot get the min log number to keep.");
}
// Between GetLiveFiles and getting min_log_num, flush might happen
// concurrently, so new WAL deletions might be tracked in MANIFEST. If we do
// not get the new MANIFEST size, the deleted WALs might not be reflected in
// the checkpoint's MANIFEST.
//
// If we get min_log_num before the above GetLiveFiles, then there might
// be too many unnecessary WALs to be included in the checkpoint.
//
// Ideally, min_log_num should be got together with manifest_file_size in
// GetLiveFiles atomically. But that needs changes to GetLiveFiles' signature
// which is a public API.
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:FlushDone");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
if (s.ok()) {
s = db_->FlushWAL(false /* sync */);
}
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1");
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2");
// if we have more than one column family, we need to also get WAL files
if (s.ok()) {
s = db_->GetSortedWalFiles(live_wal_files);
}
if (!s.ok()) {
return s;
}
size_t wal_size = live_wal_files.size();
LiveFilesStorageInfoOptions opts;
opts.include_checksum_info = get_live_table_checksum;
opts.wal_size_for_flush = log_size_for_flush;
// process live files, non-table, non-blob files first
std::string manifest_fname, current_fname;
// record table and blob files for processing next
std::vector<std::tuple<std::string, uint64_t, FileType>>
live_table_and_blob_files;
for (auto& live_file : live_files) {
std::vector<LiveFileStorageInfo> infos;
{
Status s = db_->GetLiveFilesStorageInfo(opts, &infos);
if (!s.ok()) {
break;
}
uint64_t number;
FileType type;
bool ok = ParseFileName(live_file, &number, &type);
if (!ok) {
s = Status::Corruption("Can't parse file name. This is very bad");
break;
}
// we should only get sst, blob, options, manifest and current files here
assert(type == kTableFile || type == kBlobFile || type == kDescriptorFile ||
type == kCurrentFile || type == kOptionsFile);
assert(live_file.size() > 0 && live_file[0] == '/');
if (type == kCurrentFile) {
// We will craft the current file manually to ensure it's consistent with
// the manifest number. This is necessary because current's file contents
// can change during checkpoint creation.
current_fname = live_file;
continue;
} else if (type == kDescriptorFile) {
manifest_fname = live_file;
return s;
}
}
if (type != kTableFile && type != kBlobFile) {
// copy non-table, non-blob files here
// * if it's kDescriptorFile, limit the size to manifest_file_size
s = copy_file_cb(db_->GetName(), live_file,
(type == kDescriptorFile) ? manifest_file_size : 0, type,
kUnknownFileChecksumFuncName, kUnknownFileChecksum);
} else {
// process table and blob files below
live_table_and_blob_files.emplace_back(live_file, number, type);
// Verify that everything except WAL files are in same directory
// (db_paths / cf_paths not supported)
std::unordered_set<std::string> dirs;
for (auto& info : infos) {
if (info.file_type != kWalFile) {
dirs.insert(info.directory);
}
}
// get checksum info for table and blob files.
// get table and blob file checksums if get_live_table_checksum is true
std::unique_ptr<FileChecksumList> checksum_list;
if (s.ok() && get_live_table_checksum) {
checksum_list.reset(NewFileChecksumList());
// should succeed even without checksum info present, else manifest
// is corrupt
s = GetFileChecksumsFromManifest(db_->GetEnv(),
db_->GetName() + manifest_fname,
manifest_file_size, checksum_list.get());
if (dirs.size() > 1) {
return Status::NotSupported(
"db_paths / cf_paths not supported for Checkpoint nor BackupEngine");
}
// copy/hard link live table and blob files
for (const auto& file : live_table_and_blob_files) {
if (!s.ok()) {
break;
}
bool same_fs = true;
const std::string& src_fname = std::get<0>(file);
const uint64_t number = std::get<1>(file);
const FileType type = std::get<2>(file);
// rules:
// * for kTableFile/kBlobFile, attempt hard link instead of copy.
// * but can't hard link across filesystems.
if (same_fs) {
s = link_file_cb(db_->GetName(), src_fname, type);
if (s.IsNotSupported()) {
same_fs = false;
s = Status::OK();
}
}
if (!same_fs) {
std::string checksum_name = kUnknownFileChecksumFuncName;
std::string checksum_value = kUnknownFileChecksum;
// we ignore the checksums either they are not required or we failed to
// obtain the checksum list for old table files that have no file
// checksums
if (get_live_table_checksum) {
// find checksum info for table files
Status search = checksum_list->SearchOneFileChecksum(
number, &checksum_value, &checksum_name);
// could be a legacy file lacking checksum info. overall OK if
// not found
if (!search.ok()) {
assert(checksum_name == kUnknownFileChecksumFuncName);
assert(checksum_value == kUnknownFileChecksum);
}
for (auto& info : infos) {
Status s;
if (!info.replacement_contents.empty()) {
// Currently should only be used for CURRENT file.
assert(info.file_type == kCurrentFile);
if (info.size != info.replacement_contents.size()) {
s = Status::Corruption("Inconsistent size metadata for " +
info.relative_filename);
} else {
s = create_file_cb(info.relative_filename, info.replacement_contents,
info.file_type);
}
s = copy_file_cb(db_->GetName(), src_fname, 0, type, checksum_name,
checksum_value);
}
}
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
s = create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
kCurrentFile);
}
ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
live_wal_files.size());
// Link WAL files. Copy exact size of last one because it is the only one
// that has changes after the last flush.
ImmutableDBOptions ioptions(db_options);
auto wal_dir = ioptions.GetWalDir();
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes(), kWalFile,
kUnknownFileChecksumFuncName, kUnknownFileChecksum);
break;
}
if (same_fs) {
// we only care about live log files
s = link_file_cb(wal_dir, live_wal_files[i]->PathName(), kWalFile);
} else {
if (same_fs && !info.trim_to_size) {
s = link_file_cb(info.directory, info.relative_filename,
info.file_type);
if (s.IsNotSupported()) {
same_fs = false;
s = Status::OK();
}
s.MustCheck();
}
if (!same_fs) {
s = copy_file_cb(wal_dir, live_wal_files[i]->PathName(), 0, kWalFile,
kUnknownFileChecksumFuncName, kUnknownFileChecksum);
if (!same_fs || info.trim_to_size) {
assert(info.file_checksum_func_name.empty() ==
!opts.include_checksum_info);
// no assertion on file_checksum because empty is used for both "not
// set" and "unknown"
if (opts.include_checksum_info) {
s = copy_file_cb(info.directory, info.relative_filename, info.size,
info.file_type, info.file_checksum_func_name,
info.file_checksum);
} else {
s = copy_file_cb(info.directory, info.relative_filename, info.size,
info.file_type, kUnknownFileChecksumFuncName,
kUnknownFileChecksum);
}
}
}
if (!s.ok()) {
return s;
}
}
return s;
return Status::OK();
}
// Exports all live SST files of a specified Column Family onto export_dir,

@ -29,7 +29,6 @@ class CheckpointImpl : public Checkpoint {
// Checkpoint logic can be customized by providing callbacks for link, copy,
// or create.
Status CreateCustomCheckpoint(
const DBOptions& db_options,
std::function<Status(const std::string& src_dirname,
const std::string& fname, FileType type)>
link_file_cb,

@ -902,6 +902,19 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) {
delete snapshot_db;
}
TEST_F(CheckpointTest, CheckpointWithDbPath) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_ + "_2", 0);
Reopen(options);
ASSERT_OK(Put("key1", "val1"));
Flush();
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
// Currently not supported
ASSERT_TRUE(checkpoint->CreateCheckpoint(snapshot_name_).IsNotSupported());
delete checkpoint;
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

Loading…
Cancel
Save