Add manifest fix-up utility for file temperatures (#9683)

Summary:
The goal of this change is to allow changes to the "current" (in
FileSystem) file temperatures to feed back into DB metadata, so that
they can inform decisions and stats reporting. In part because of
modular code factoring, it doesn't seem easy to do this automagically,
where opening an SST file and observing current Temperature different
from expected would trigger a change in metadata and DB manifest write
(essentially giving the deep read path access to the write path). It is also
difficult to do this while the DB is open because of the limitations of
LogAndApply.

This change allows updating file temperature metadata on a closed DB
using an experimental utility function UpdateManifestForFilesState()
or `ldb update_manifest --update_temperatures`. This should suffice for
"migration" scenarios where outside tooling has placed or re-arranged DB
files into a (different) tiered configuration without going through
RocksDB itself (currently, only compaction can change temperature
metadata).

Some details:
* Refactored and added unit test for `ldb unsafe_remove_sst_file` because
of shared functionality
* Pulled in autovector.h changes from https://github.com/facebook/rocksdb/issues/9546 to fix SuperVersionContext
move constructor (related to an older draft of this change)

Possible follow-up work:
* Support updating manifest with file checksums, such as when a
new checksum function is used and want existing DB metadata updated
for it.
* It's possible that for some repair scenarios, lighter weight than
full repair, we might want to support UpdateManifestForFilesState() to
modify critical file details like size or checksum using same
algorithm. But let's make sure these are differentiated from modifying
file details in ways that don't suspect corruption (or require extreme
trust).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9683

Test Plan: unit tests added

Reviewed By: jay-zhuang

Differential Revision: D34798828

Pulled By: pdillinger

fbshipit-source-id: cfd83e8fb10761d8c9e7f9c020d68c9106a95554
main
Peter Dillinger 3 years ago committed by Facebook GitHub Bot
parent b2aacaf923
commit a8a422e962
  1. 1
      HISTORY.md
  2. 2
      Makefile
  3. 100
      db/db_test2.cc
  4. 100
      db/experimental.cc
  5. 2
      db/internal_stats.cc
  6. 5
      db/job_context.h
  7. 68
      db/version_util.h
  8. 27
      include/rocksdb/experimental.h
  9. 97
      tools/ldb_cmd.cc
  10. 22
      tools/ldb_cmd_impl.h
  11. 197
      tools/ldb_cmd_test.cc
  12. 1
      tools/ldb_tool.cc
  13. 22
      util/autovector.h

@ -9,6 +9,7 @@
* Added BlobDB options to `ldb` * Added BlobDB options to `ldb`
* `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`. * `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`.
* Automatically recover from retryable read IO errors during backgorund flush/compaction. * Automatically recover from retryable read IO errors during backgorund flush/compaction.
* Experimental support for preserving file Temperatures through backup and restore, and for updating DB metadata for outside changes to file Temperature (`UpdateManifestForFilesState` or `ldb update_manifest --update_temperatures`).
### Bug Fixes ### Bug Fixes
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496) * Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)

@ -909,7 +909,7 @@ gen_parallel_tests:
# 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest # 107.816 PASS t/DBTest.EncodeDecompressedBlockSizeTest
# #
slow_test_regexp = \ slow_test_regexp = \
^.*SnapshotConcurrentAccessTest.*$$|^.*SeqAdvanceConcurrentTest.*$$|^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$|^.*RecoverFromCorruptedWALWithoutFlush$$ ^.*MySQLStyleTransactionTest.*$$|^.*SnapshotConcurrentAccessTest.*$$|^.*SeqAdvanceConcurrentTest.*$$|^t/run-table_test-HarnessTest.Randomized$$|^t/run-db_test-.*(?:FileCreationRandomFailure|EncodeDecompressedBlockSizeTest)$$|^.*RecoverFromCorruptedWALWithoutFlush$$
prioritize_long_running_tests = \ prioritize_long_running_tests = \
perl -pe 's,($(slow_test_regexp)),100 $$1,' \ perl -pe 's,($(slow_test_regexp)),100 $$1,' \
| sort -k1,1gr \ | sort -k1,1gr \

@ -17,6 +17,7 @@
#include "options/options_helper.h" #include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/experimental.h"
#include "rocksdb/iostats_context.h" #include "rocksdb/iostats_context.h"
#include "rocksdb/persistent_cache.h" #include "rocksdb/persistent_cache.h"
#include "rocksdb/trace_record.h" #include "rocksdb/trace_record.h"
@ -6973,6 +6974,105 @@ TEST_F(DBTest2, CheckpointFileTemperature) {
delete checkpoint; delete checkpoint;
Close(); Close();
} }
TEST_F(DBTest2, FileTemperatureManifestFixup) {
auto test_fs = std::make_shared<FileTemperatureTestFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
Options options = CurrentOptions();
options.bottommost_temperature = Temperature::kWarm;
options.level0_file_num_compaction_trigger = 2;
options.env = env.get();
std::vector<std::string> cfs = {/*"default",*/ "test1", "test2"};
CreateAndReopenWithCF(cfs, options);
// Needed for later re-opens (weird)
cfs.insert(cfs.begin(), kDefaultColumnFamilyName);
// Generate a bottommost file in all CFs
for (int cf = 0; cf < 3; ++cf) {
ASSERT_OK(Put(cf, "a", "val"));
ASSERT_OK(Put(cf, "c", "val"));
ASSERT_OK(Flush(cf));
ASSERT_OK(Put(cf, "b", "val"));
ASSERT_OK(Put(cf, "d", "val"));
ASSERT_OK(Flush(cf));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify
ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
// Generate a non-bottommost file in all CFs
for (int cf = 0; cf < 3; ++cf) {
ASSERT_OK(Put(cf, "e", "val"));
ASSERT_OK(Flush(cf));
}
// re-verify
ASSERT_GT(GetSstSizeHelper(Temperature::kWarm), 0);
// Not supported: ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
// Now change FS temperature on bottommost file(s) to kCold
std::map<uint64_t, Temperature> current_temps;
test_fs->CopyCurrentSstFileTemperatures(&current_temps);
for (auto e : current_temps) {
if (e.second == Temperature::kWarm) {
test_fs->OverrideSstFileTemperature(e.first, Temperature::kCold);
}
}
// Metadata not yet updated
ASSERT_EQ(Get("a"), "val");
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
// Update with Close and UpdateManifestForFilesState, but first save cf
// descriptors
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < handles_.size(); ++i) {
ColumnFamilyDescriptor cfdescriptor;
// GetDescriptor is not implemented for ROCKSDB_LITE
handles_[i]->GetDescriptor(&cfdescriptor).PermitUncheckedError();
column_families.push_back(cfdescriptor);
}
Close();
experimental::UpdateManifestForFilesStateOptions update_opts;
update_opts.update_temperatures = true;
ASSERT_OK(experimental::UpdateManifestForFilesState(
options, dbname_, column_families, update_opts));
// Re-open and re-verify after update
ReopenWithColumnFamilies(cfs, options);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
// Not supported: ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kHot), 0);
// Change kUnknown to kHot
test_fs->CopyCurrentSstFileTemperatures(&current_temps);
for (auto e : current_temps) {
if (e.second == Temperature::kUnknown) {
test_fs->OverrideSstFileTemperature(e.first, Temperature::kHot);
}
}
// Update with Close and UpdateManifestForFilesState
Close();
ASSERT_OK(experimental::UpdateManifestForFilesState(
options, dbname_, column_families, update_opts));
// Re-open and re-verify after update
ReopenWithColumnFamilies(cfs, options);
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kUnknown), 0);
ASSERT_EQ(GetSstSizeHelper(Temperature::kWarm), 0);
ASSERT_GT(GetSstSizeHelper(Temperature::kHot), 0);
Close();
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery. // WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.

@ -6,6 +6,8 @@
#include "rocksdb/experimental.h" #include "rocksdb/experimental.h"
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/version_util.h"
#include "logging/logging.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace experimental { namespace experimental {
@ -46,5 +48,103 @@ Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) {
return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end); return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end);
} }
Status UpdateManifestForFilesState(
const DBOptions& db_opts, const std::string& db_name,
const std::vector<ColumnFamilyDescriptor>& column_families,
const UpdateManifestForFilesStateOptions& opts) {
OfflineManifestWriter w(db_opts, db_name);
Status s = w.Recover(column_families);
size_t files_updated = 0;
size_t cfs_updated = 0;
auto fs = db_opts.env->GetFileSystem();
for (auto cfd : *w.Versions().GetColumnFamilySet()) {
if (!s.ok()) {
break;
}
assert(cfd);
if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
const auto* current = cfd->current();
assert(current);
const auto* vstorage = current->storage_info();
assert(vstorage);
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
/* SST files */
for (int level = 0; level < cfd->NumberLevels(); level++) {
if (!s.ok()) {
break;
}
const auto& level_files = vstorage->LevelFiles(level);
for (const auto& lf : level_files) {
assert(lf);
uint64_t number = lf->fd.GetNumber();
std::string fname =
TableFileName(w.IOptions().db_paths, number, lf->fd.GetPathId());
std::unique_ptr<FSSequentialFile> f;
FileOptions fopts;
fopts.temperature = lf->temperature;
IOStatus file_ios =
fs->NewSequentialFile(fname, fopts, &f, /*dbg*/ nullptr);
if (file_ios.ok()) {
if (opts.update_temperatures) {
Temperature temp = f->GetTemperature();
if (temp != Temperature::kUnknown && temp != lf->temperature) {
// Current state inconsistent with manifest
++files_updated;
edit.DeleteFile(level, number);
edit.AddFile(level, number, lf->fd.GetPathId(),
lf->fd.GetFileSize(), lf->smallest, lf->largest,
lf->fd.smallest_seqno, lf->fd.largest_seqno,
lf->marked_for_compaction, temp,
lf->oldest_blob_file_number,
lf->oldest_ancester_time, lf->file_creation_time,
lf->file_checksum, lf->file_checksum_func_name,
lf->min_timestamp, lf->max_timestamp);
}
}
} else {
s = file_ios;
break;
}
}
}
if (s.ok() && edit.NumEntries() > 0) {
s = w.LogAndApply(cfd, &edit);
if (s.ok()) {
++cfs_updated;
}
}
}
if (cfs_updated > 0) {
ROCKS_LOG_INFO(db_opts.info_log,
"UpdateManifestForFilesState: updated %zu files in %zu CFs",
files_updated, cfs_updated);
} else if (s.ok()) {
ROCKS_LOG_INFO(db_opts.info_log,
"UpdateManifestForFilesState: no updates needed");
}
if (!s.ok()) {
ROCKS_LOG_ERROR(db_opts.info_log, "UpdateManifestForFilesState failed: %s",
s.ToString().c_str());
}
return s;
}
} // namespace experimental } // namespace experimental
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -519,7 +519,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
{false, nullptr, &InternalStats::HandleLiveSstFilesSize, nullptr, {false, nullptr, &InternalStats::HandleLiveSstFilesSize, nullptr,
nullptr}}, nullptr}},
{DB::Properties::kLiveSstFilesSizeAtTemperature, {DB::Properties::kLiveSstFilesSizeAtTemperature,
{true, &InternalStats::HandleLiveSstFilesSizeAtTemperature, nullptr, {false, &InternalStats::HandleLiveSstFilesSizeAtTemperature, nullptr,
nullptr, nullptr}}, nullptr, nullptr}},
{DB::Properties::kEstimatePendingCompactionBytes, {DB::Properties::kEstimatePendingCompactionBytes,
{false, nullptr, &InternalStats::HandleEstimatePendingCompactionBytes, {false, nullptr, &InternalStats::HandleEstimatePendingCompactionBytes,

@ -37,13 +37,16 @@ struct SuperVersionContext {
explicit SuperVersionContext(bool create_superversion = false) explicit SuperVersionContext(bool create_superversion = false)
: new_superversion(create_superversion ? new SuperVersion() : nullptr) {} : new_superversion(create_superversion ? new SuperVersion() : nullptr) {}
explicit SuperVersionContext(SuperVersionContext&& other) explicit SuperVersionContext(SuperVersionContext&& other) noexcept
: superversions_to_free(std::move(other.superversions_to_free)), : superversions_to_free(std::move(other.superversions_to_free)),
#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION #ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION
write_stall_notifications(std::move(other.write_stall_notifications)), write_stall_notifications(std::move(other.write_stall_notifications)),
#endif #endif
new_superversion(std::move(other.new_superversion)) { new_superversion(std::move(other.new_superversion)) {
} }
// No copies
SuperVersionContext(const SuperVersionContext& other) = delete;
void operator=(const SuperVersionContext& other) = delete;
void NewSuperVersion() { void NewSuperVersion() {
new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion()); new_superversion = std::unique_ptr<SuperVersion>(new SuperVersion());

@ -0,0 +1,68 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "db/version_set.h"
namespace ROCKSDB_NAMESPACE {
// Instead of opening a `DB` to perform certain manifest updates, this
// uses the underlying `VersionSet` API to read and modify the MANIFEST. This
// allows us to use the user's real options, while not having to worry about
// the DB persisting new SST files via flush/compaction or attempting to read/
// compact files which may fail, particularly for the file we intend to remove
// (the user may want to remove an already deleted file from MANIFEST).
class OfflineManifestWriter {
public:
OfflineManifestWriter(const DBOptions& options, const std::string& db_path)
: wc_(options.delayed_write_rate),
wb_(options.db_write_buffer_size),
immutable_db_options_(WithDbPath(options, db_path)),
tc_(NewLRUCache(1 << 20 /* capacity */,
options.table_cache_numshardbits)),
versions_(db_path, &immutable_db_options_, sopt_, tc_.get(), &wb_, &wc_,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "") {}
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families) {
return versions_.Recover(column_families);
}
Status LogAndApply(ColumnFamilyData* cfd, VersionEdit* edit) {
// Use `mutex` to imitate a locked DB mutex when calling `LogAndApply()`.
InstrumentedMutex mutex;
mutex.Lock();
Status s = versions_.LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
edit, &mutex, nullptr /* db_directory */,
false /* new_descriptor_log */);
mutex.Unlock();
return s;
}
VersionSet& Versions() { return versions_; }
const ImmutableDBOptions& IOptions() { return immutable_db_options_; }
private:
WriteController wc_;
WriteBufferManager wb_;
ImmutableDBOptions immutable_db_options_;
std::shared_ptr<Cache> tc_;
EnvOptions sopt_;
VersionSet versions_;
static ImmutableDBOptions WithDbPath(const DBOptions& options,
const std::string& db_path) {
ImmutableDBOptions rv(options);
if (rv.db_paths.empty()) {
// `VersionSet` expects options that have been through
// `SanitizeOptions()`, which would sanitize an empty `db_paths`.
rv.db_paths.emplace_back(db_path, 0 /* target_size */);
}
return rv;
}
};
} // namespace ROCKSDB_NAMESPACE

@ -25,5 +25,32 @@ Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end);
Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, Status PromoteL0(DB* db, ColumnFamilyHandle* column_family,
int target_level = 1); int target_level = 1);
struct UpdateManifestForFilesStateOptions {
// When true, read current file temperatures from FileSystem and update in
// DB manifest when a temperature other than Unknown is reported and
// inconsistent with manifest.
bool update_temperatures = true;
// TODO: new_checksums: to update files to latest file checksum algorithm
};
// Utility for updating manifest of DB directory (not open) for current state
// of files on filesystem. See UpdateManifestForFilesStateOptions.
//
// To minimize interference with ongoing DB operations, only the following
// guarantee is provided, assuming no IO error encountered:
// * Only files live in DB at start AND end of call to
// UpdateManifestForFilesState() are guaranteed to be updated (as needed) in
// manifest.
// * For example, new files after start of call to
// UpdateManifestForFilesState() might not be updated, but that is not
// typically required to achieve goal of manifest consistency/completeness
// (because current DB configuration would ensure new files get the desired
// consistent metadata).
Status UpdateManifestForFilesState(
const DBOptions& db_opts, const std::string& db_name,
const std::vector<ColumnFamilyDescriptor>& column_families,
const UpdateManifestForFilesStateOptions& opts = {});
} // namespace experimental } // namespace experimental
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -21,11 +21,14 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/version_util.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "file/filename.h" #include "file/filename.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/experimental.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "rocksdb/utilities/backup_engine.h" #include "rocksdb/utilities/backup_engine.h"
#include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/checkpoint.h"
@ -301,6 +304,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
return new UnsafeRemoveSstFileCommand(parsed_params.cmd_params, return new UnsafeRemoveSstFileCommand(parsed_params.cmd_params,
parsed_params.option_map, parsed_params.option_map,
parsed_params.flags); parsed_params.flags);
} else if (parsed_params.cmd == UpdateManifestCommand::Name()) {
return new UpdateManifestCommand(parsed_params.cmd_params,
parsed_params.option_map,
parsed_params.flags);
} }
return nullptr; return nullptr;
} }
@ -3985,49 +3992,27 @@ UnsafeRemoveSstFileCommand::UnsafeRemoveSstFileCommand(
} }
void UnsafeRemoveSstFileCommand::DoCommand() { void UnsafeRemoveSstFileCommand::DoCommand() {
// Instead of opening a `DB` and calling `DeleteFile()`, this implementation
// uses the underlying `VersionSet` API to read and modify the MANIFEST. This
// allows us to use the user's real options, while not having to worry about
// the DB persisting new SST files via flush/compaction or attempting to read/
// compact files which may fail, particularly for the file we intend to remove
// (the user may want to remove an already deleted file from MANIFEST).
PrepareOptions(); PrepareOptions();
if (options_.db_paths.empty()) { OfflineManifestWriter w(options_, db_path_);
// `VersionSet` expects options that have been through `SanitizeOptions()`, if (column_families_.empty()) {
// which would sanitize an empty `db_paths`. column_families_.emplace_back(kDefaultColumnFamilyName, options_);
options_.db_paths.emplace_back(db_path_, 0 /* target_size */);
} }
Status s = w.Recover(column_families_);
WriteController wc(options_.delayed_write_rate);
WriteBufferManager wb(options_.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options_);
std::shared_ptr<Cache> tc(
NewLRUCache(1 << 20 /* capacity */, options_.table_cache_numshardbits));
EnvOptions sopt;
VersionSet versions(db_path_, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
/*db_session_id*/ "");
Status s = versions.Recover(column_families_);
ColumnFamilyData* cfd = nullptr; ColumnFamilyData* cfd = nullptr;
int level = -1; int level = -1;
if (s.ok()) { if (s.ok()) {
FileMetaData* metadata = nullptr; FileMetaData* metadata = nullptr;
s = versions.GetMetadataForFile(sst_file_number_, &level, &metadata, &cfd); s = w.Versions().GetMetadataForFile(sst_file_number_, &level, &metadata,
&cfd);
} }
if (s.ok()) { if (s.ok()) {
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(level, sst_file_number_); edit.DeleteFile(level, sst_file_number_);
// Use `mutex` to imitate a locked DB mutex when calling `LogAndApply()`. s = w.LogAndApply(cfd, &edit);
InstrumentedMutex mutex;
mutex.Lock();
s = versions.LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
&mutex, nullptr /* db_directory */,
false /* new_descriptor_log */);
mutex.Unlock();
} }
if (!s.ok()) { if (!s.ok()) {
@ -4038,5 +4023,59 @@ void UnsafeRemoveSstFileCommand::DoCommand() {
} }
} }
const std::string UpdateManifestCommand::ARG_VERBOSE = "verbose";
const std::string UpdateManifestCommand::ARG_UPDATE_TEMPERATURES =
"update_temperatures";
void UpdateManifestCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(UpdateManifestCommand::Name());
ret.append(" [--update_temperatures]");
ret.append(" ");
ret.append(" MUST NOT be used on a live DB.");
ret.append("\n");
}
UpdateManifestCommand::UpdateManifestCommand(
const std::vector<std::string>& /*params*/,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, false /* is_read_only */,
BuildCmdLineOptions({ARG_VERBOSE, ARG_UPDATE_TEMPERATURES})) {
verbose_ = IsFlagPresent(flags, ARG_VERBOSE) ||
ParseBooleanOption(options, ARG_VERBOSE, false);
update_temperatures_ =
IsFlagPresent(flags, ARG_UPDATE_TEMPERATURES) ||
ParseBooleanOption(options, ARG_UPDATE_TEMPERATURES, false);
if (!update_temperatures_) {
exec_state_ = LDBCommandExecuteResult::Failed(
"No action like --update_temperatures specified for update_manifest");
}
}
void UpdateManifestCommand::DoCommand() {
PrepareOptions();
auto level = verbose_ ? InfoLogLevel::INFO_LEVEL : InfoLogLevel::WARN_LEVEL;
options_.info_log.reset(new StderrLogger(level));
experimental::UpdateManifestForFilesStateOptions opts;
opts.update_temperatures = update_temperatures_;
if (column_families_.empty()) {
column_families_.emplace_back(kDefaultColumnFamilyName, options_);
}
Status s = experimental::UpdateManifestForFilesState(options_, db_path_,
column_families_);
if (!s.ok()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"failed to update manifest: " + s.ToString());
} else {
exec_state_ =
LDBCommandExecuteResult::Succeed("Manifest updates successful");
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -190,6 +190,28 @@ class ManifestDumpCommand : public LDBCommand {
static const std::string ARG_PATH; static const std::string ARG_PATH;
}; };
class UpdateManifestCommand : public LDBCommand {
public:
static std::string Name() { return "update_manifest"; }
UpdateManifestCommand(const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
static void Help(std::string& ret);
virtual void DoCommand() override;
virtual bool NoDBOpen() override { return true; }
private:
bool verbose_;
bool update_temperatures_;
// TODO future: checksum_func for populating checksums
static const std::string ARG_VERBOSE;
static const std::string ARG_UPDATE_TEMPERATURES;
};
class FileChecksumDumpCommand : public LDBCommand { class FileChecksumDumpCommand : public LDBCommand {
public: public:
static std::string Name() { return "file_checksum_dump"; } static std::string Name() { return "file_checksum_dump"; }

@ -4,16 +4,21 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
// //
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "rocksdb/utilities/ldb_cmd.h" #include "rocksdb/utilities/ldb_cmd.h"
#include <cinttypes>
#include "db/db_test_util.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "env/composite_env_wrapper.h" #include "env/composite_env_wrapper.h"
#include "file/filename.h" #include "file/filename.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/advanced_options.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
#include "rocksdb/file_system.h"
#include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/options_util.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
@ -920,6 +925,195 @@ TEST_F(LdbCmdTest, LoadCFOptionsAndOverride) {
ASSERT_EQ(column_families[1].options.write_buffer_size, 268435456); ASSERT_EQ(column_families[1].options.write_buffer_size, 268435456);
} }
TEST_F(LdbCmdTest, UnsafeRemoveSstFile) {
Options opts;
opts.level0_file_num_compaction_trigger = 10;
opts.create_if_missing = true;
DB* db = nullptr;
std::string dbname = test::PerThreadDBPath(Env::Default(), "ldb_cmd_test");
DestroyDB(dbname, opts);
ASSERT_OK(DB::Open(opts, dbname, &db));
// Create three SST files
for (size_t i = 0; i < 3; ++i) {
ASSERT_OK(db->Put(WriteOptions(), ToString(i), ToString(i)));
ASSERT_OK(db->Flush(FlushOptions()));
}
// Determine which is the "middle" one
std::vector<LiveFileMetaData> sst_files;
db->GetLiveFilesMetaData(&sst_files);
std::vector<uint64_t> numbers;
for (auto& f : sst_files) {
numbers.push_back(f.file_number);
}
ASSERT_EQ(numbers.size(), 3);
std::sort(numbers.begin(), numbers.end());
uint64_t to_remove = numbers[1];
// Close for unsafe_remove_sst_file
delete db;
db = nullptr;
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "unsafe_remove_sst_file";
char arg4[20];
snprintf(arg4, sizeof(arg4), "%" PRIu64, to_remove);
char* argv[] = {arg1, arg2, arg3, arg4};
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(4, argv, opts, LDBOptions(), nullptr));
// Re-open, and verify with Get that middle file is gone
ASSERT_OK(DB::Open(opts, dbname, &db));
std::string val;
ASSERT_OK(db->Get(ReadOptions(), "0", &val));
ASSERT_EQ(val, "0");
ASSERT_OK(db->Get(ReadOptions(), "2", &val));
ASSERT_EQ(val, "2");
ASSERT_TRUE(db->Get(ReadOptions(), "1", &val).IsNotFound());
// Now with extra CF, two more files
ColumnFamilyHandle* cf_handle;
ColumnFamilyOptions cf_opts;
ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf1", &cf_handle));
for (size_t i = 3; i < 5; ++i) {
ASSERT_OK(db->Put(WriteOptions(), cf_handle, ToString(i), ToString(i)));
ASSERT_OK(db->Flush(FlushOptions(), cf_handle));
}
// Determine which is the "last" one
sst_files.clear();
db->GetLiveFilesMetaData(&sst_files);
numbers.clear();
for (auto& f : sst_files) {
numbers.push_back(f.file_number);
}
ASSERT_EQ(numbers.size(), 4);
std::sort(numbers.begin(), numbers.end());
to_remove = numbers.back();
// Close for unsafe_remove_sst_file
delete cf_handle;
delete db;
db = nullptr;
snprintf(arg4, sizeof(arg4), "%" PRIu64, to_remove);
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(4, argv, opts, LDBOptions(), nullptr));
std::vector<ColumnFamilyDescriptor> cfds = {{kDefaultColumnFamilyName, opts},
{"cf1", cf_opts}};
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(DB::Open(opts, dbname, cfds, &handles, &db));
ASSERT_OK(db->Get(ReadOptions(), handles[1], "3", &val));
ASSERT_EQ(val, "3");
ASSERT_TRUE(db->Get(ReadOptions(), handles[1], "4", &val).IsNotFound());
ASSERT_OK(db->Get(ReadOptions(), handles[0], "0", &val));
ASSERT_EQ(val, "0");
for (auto& h : handles) {
delete h;
}
delete db;
}
TEST_F(LdbCmdTest, FileTemperatureUpdateManifest) {
auto test_fs = std::make_shared<FileTemperatureTestFS>(FileSystem::Default());
std::unique_ptr<Env> env(new CompositeEnvWrapper(Env::Default(), test_fs));
Options opts;
opts.bottommost_temperature = Temperature::kWarm;
opts.level0_file_num_compaction_trigger = 10;
opts.create_if_missing = true;
opts.env = env.get();
DB* db = nullptr;
std::string dbname = test::PerThreadDBPath(env.get(), "ldb_cmd_test");
DestroyDB(dbname, opts);
ASSERT_OK(DB::Open(opts, dbname, &db));
std::array<Temperature, 5> kTestTemps = {
Temperature::kCold, Temperature::kWarm, Temperature::kHot,
Temperature::kWarm, Temperature::kCold};
std::map<uint64_t, Temperature> number_to_temp;
for (size_t i = 0; i < kTestTemps.size(); ++i) {
ASSERT_OK(db->Put(WriteOptions(), ToString(i), ToString(i)));
ASSERT_OK(db->Flush(FlushOptions()));
std::map<uint64_t, Temperature> current_temps;
test_fs->CopyCurrentSstFileTemperatures(&current_temps);
for (auto e : current_temps) {
if (e.second == Temperature::kUnknown) {
test_fs->OverrideSstFileTemperature(e.first, kTestTemps[i]);
number_to_temp[e.first] = kTestTemps[i];
}
}
}
// Close & reopen
delete db;
db = nullptr;
test_fs->PopRequestedSstFileTemperatures();
ASSERT_OK(DB::Open(opts, dbname, &db));
for (size_t i = 0; i < kTestTemps.size(); ++i) {
std::string val;
ASSERT_OK(db->Get(ReadOptions(), ToString(i), &val));
ASSERT_EQ(val, ToString(i));
}
// Still all unknown
std::vector<std::pair<uint64_t, Temperature>> requests;
test_fs->PopRequestedSstFileTemperatures(&requests);
ASSERT_EQ(requests.size(), kTestTemps.size());
for (auto& r : requests) {
ASSERT_EQ(r.second, Temperature::kUnknown);
}
// Close for update_manifest
delete db;
db = nullptr;
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "update_manifest";
char arg4[] = "--update_temperatures";
char* argv[] = {arg1, arg2, arg3, arg4};
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(4, argv, opts, LDBOptions(), nullptr));
// Re-open, get, and verify manifest temps (based on request)
test_fs->PopRequestedSstFileTemperatures();
ASSERT_OK(DB::Open(opts, dbname, &db));
for (size_t i = 0; i < kTestTemps.size(); ++i) {
std::string val;
ASSERT_OK(db->Get(ReadOptions(), ToString(i), &val));
ASSERT_EQ(val, ToString(i));
}
requests.clear();
test_fs->PopRequestedSstFileTemperatures(&requests);
ASSERT_EQ(requests.size(), kTestTemps.size());
for (auto& r : requests) {
ASSERT_EQ(r.second, number_to_temp[r.first]);
}
delete db;
}
TEST_F(LdbCmdTest, RenameDbAndLoadOptions) { TEST_F(LdbCmdTest, RenameDbAndLoadOptions) {
Env* env = TryLoadCustomOrDefaultEnv(); Env* env = TryLoadCustomOrDefaultEnv();
Options opts; Options opts;
@ -975,6 +1169,7 @@ TEST_F(LdbCmdTest, RenameDbAndLoadOptions) {
0, LDBCommandRunner::RunCommand(5, argv5, opts, LDBOptions(), nullptr)); 0, LDBCommandRunner::RunCommand(5, argv5, opts, LDBOptions(), nullptr));
DestroyDB(new_dbname, opts); DestroyDB(new_dbname, opts);
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -105,6 +105,7 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
DBDumperCommand::Help(ret); DBDumperCommand::Help(ret);
DBLoaderCommand::Help(ret); DBLoaderCommand::Help(ret);
ManifestDumpCommand::Help(ret); ManifestDumpCommand::Help(ret);
UpdateManifestCommand::Help(ret);
FileChecksumDumpCommand::Help(ret); FileChecksumDumpCommand::Help(ret);
GetPropertyCommand::Help(ret); GetPropertyCommand::Help(ret);
ListColumnFamiliesCommand::Help(ret); ListColumnFamiliesCommand::Help(ret);

@ -11,6 +11,7 @@
#include <stdexcept> #include <stdexcept>
#include <vector> #include <vector>
#include "port/lang.h"
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -320,6 +321,9 @@ class autovector {
autovector& operator=(const autovector& other) { return assign(other); } autovector& operator=(const autovector& other) { return assign(other); }
autovector(autovector&& other) noexcept { *this = std::move(other); }
autovector& operator=(autovector&& other);
// -- Iterator Operations // -- Iterator Operations
iterator begin() { return iterator(this, 0); } iterator begin() { return iterator(this, 0); }
@ -352,7 +356,8 @@ class autovector {
}; };
template <class T, size_t kSize> template <class T, size_t kSize>
autovector<T, kSize>& autovector<T, kSize>::assign(const autovector& other) { autovector<T, kSize>& autovector<T, kSize>::assign(
const autovector<T, kSize>& other) {
values_ = reinterpret_cast<pointer>(buf_); values_ = reinterpret_cast<pointer>(buf_);
// copy the internal vector // copy the internal vector
vect_.assign(other.vect_.begin(), other.vect_.end()); vect_.assign(other.vect_.begin(), other.vect_.end());
@ -363,5 +368,20 @@ autovector<T, kSize>& autovector<T, kSize>::assign(const autovector& other) {
return *this; return *this;
} }
template <class T, size_t kSize>
autovector<T, kSize>& autovector<T, kSize>::operator=(
autovector<T, kSize>&& other) {
values_ = reinterpret_cast<pointer>(buf_);
vect_ = std::move(other.vect_);
size_t n = other.num_stack_items_;
num_stack_items_ = n;
other.num_stack_items_ = 0;
for (size_t i = 0; i < n; ++i) {
values_[i] = std::move(other.values_[i]);
}
return *this;
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save