Apply automatic formatting to some files (#5114)

Summary:
Following files were run through automatic formatter:
db/db_impl.cc
db/db_impl.h
db/db_impl_compaction_flush.cc
db/db_impl_debug.cc
db/db_impl_files.cc
db/db_impl_readonly.h
db/db_impl_write.cc
db/dbformat.cc
db/dbformat.h
table/block.cc
table/block.h
table/block_based_filter_block.cc
table/block_based_filter_block.h
table/block_based_filter_block_test.cc
table/block_based_table_builder.cc
table/block_based_table_reader.cc
table/block_based_table_reader.h
table/block_builder.cc
table/block_builder.h
table/block_fetcher.cc
table/block_prefix_index.cc
table/block_prefix_index.h
table/block_test.cc
table/format.cc
table/format.h

I could easily run all the files, but I don't want people to feel that
I'm doing it for lines of code changes :)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5114

Differential Revision: D14633040

Pulled By: siying

fbshipit-source-id: 3f346cb53bf21e8c10704400da548dfce1e89a52
main
Siying Dong 6 years ago committed by Facebook Github Bot
parent 1f7f5a5a79
commit 89ab1381f8
  1. 56
      db/db_impl.cc
  2. 69
      db/db_impl.h
  3. 25
      db/db_impl_compaction_flush.cc
  4. 10
      db/db_impl_debug.cc
  5. 33
      db/db_impl_files.cc
  6. 6
      db/db_impl_readonly.h
  7. 28
      db/db_impl_write.cc
  8. 15
      db/dbformat.cc
  9. 27
      db/dbformat.h
  10. 6
      table/block.cc
  11. 22
      table/block.h
  12. 1
      table/block_based_filter_block.cc
  13. 7
      table/block_based_filter_block.h
  14. 12
      table/block_based_filter_block_test.cc
  15. 36
      table/block_based_table_builder.cc
  16. 32
      table/block_based_table_reader.cc
  17. 8
      table/block_based_table_reader.h
  18. 14
      table/block_builder.cc
  19. 18
      table/block_builder.h
  20. 33
      table/block_fetcher.cc
  21. 17
      table/block_prefix_index.cc
  22. 6
      table/block_prefix_index.h
  23. 20
      table/block_test.cc
  24. 47
      table/format.cc
  25. 6
      table/format.h

@ -299,7 +299,8 @@ Status DBImpl::ResumeImpl() {
s = Status::ShutdownInProgress(); s = Status::ShutdownInProgress();
} }
if (s.ok() && bg_error.severity() > Status::Severity::kHardError) { if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"DB resume requested but failed due to Fatal/Unrecoverable error"); "DB resume requested but failed due to Fatal/Unrecoverable error");
s = bg_error; s = bg_error;
} }
@ -370,8 +371,8 @@ Status DBImpl::ResumeImpl() {
// Wake up any waiters - in this case, it could be the shutdown thread // Wake up any waiters - in this case, it could be the shutdown thread
bg_cv_.SignalAll(); bg_cv_.SignalAll();
// No need to check BGError again. If something happened, event listener would be // No need to check BGError again. If something happened, event listener would
// notified and the operation causing it would have failed // be notified and the operation causing it would have failed
return s; return s;
} }
@ -385,7 +386,6 @@ void DBImpl::WaitForBackgroundWork() {
// Will lock the mutex_, will wait for completion if wait is true // Will lock the mutex_, will wait for completion if wait is true
void DBImpl::CancelAllBackgroundWork(bool wait) { void DBImpl::CancelAllBackgroundWork(bool wait) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work"); "Shutdown: canceling all background work");
@ -572,7 +572,7 @@ Status DBImpl::CloseHelper() {
immutable_db_options_.sst_file_manager.get()); immutable_db_options_.sst_file_manager.get());
sfm->Close(); sfm->Close();
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
if (immutable_db_options_.info_log && own_info_log_) { if (immutable_db_options_.info_log && own_info_log_) {
Status s = immutable_db_options_.info_log->Close(); Status s = immutable_db_options_.info_log->Close();
@ -730,7 +730,8 @@ bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
} }
} }
Status DBImpl::GetStatsHistory(uint64_t start_time, uint64_t end_time, Status DBImpl::GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) { std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
if (!stats_iterator) { if (!stats_iterator) {
return Status::InvalidArgument("stats_iterator not preallocated."); return Status::InvalidArgument("stats_iterator not preallocated.");
@ -912,19 +913,18 @@ Status DBImpl::SetDBOptions(
} }
if (new_options.stats_dump_period_sec != if (new_options.stats_dump_period_sec !=
mutable_db_options_.stats_dump_period_sec) { mutable_db_options_.stats_dump_period_sec) {
if (thread_dump_stats_) { if (thread_dump_stats_) {
mutex_.Unlock(); mutex_.Unlock();
thread_dump_stats_->cancel(); thread_dump_stats_->cancel();
mutex_.Lock(); mutex_.Lock();
} }
if (new_options.stats_dump_period_sec > 0) { if (new_options.stats_dump_period_sec > 0) {
thread_dump_stats_.reset(new rocksdb::RepeatableThread( thread_dump_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::DumpStats(); }, "dump_st", env_, [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
new_options.stats_dump_period_sec * 1000000)); new_options.stats_dump_period_sec * 1000000));
} } else {
else { thread_dump_stats_.reset();
thread_dump_stats_.reset(); }
}
} }
if (new_options.stats_persist_period_sec != if (new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) { mutable_db_options_.stats_persist_period_sec) {
@ -1939,7 +1939,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
ReadCallback* read_callback = nullptr; // No read callback provided. ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) { if (read_options.tailing) {
#ifdef ROCKSDB_LITE #ifdef ROCKSDB_LITE
// not supported in lite version // not supported in lite version
result = nullptr; result = nullptr;
@ -2785,7 +2785,8 @@ Status DBImpl::GetDbIdentity(std::string& identity) const {
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
char* buffer = reinterpret_cast<char*>(alloca(static_cast<size_t>(file_size))); char* buffer =
reinterpret_cast<char*>(alloca(static_cast<size_t>(file_size)));
Slice id; Slice id;
s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer); s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer);
if (!s.ok()) { if (!s.ok()) {
@ -2872,7 +2873,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname); InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
for (const auto& fname : filenames) { for (const auto& fname : filenames) {
if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) && if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
type != kDBLockFile) { // Lock file will be deleted at end type != kDBLockFile) { // Lock file will be deleted at end
Status del; Status del;
std::string path_to_delete = dbname + "/" + fname; std::string path_to_delete = dbname + "/" + fname;
if (type == kMetaDatabase) { if (type == kMetaDatabase) {
@ -2910,7 +2911,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (env->GetChildren(path, &filenames).ok()) { if (env->GetChildren(path, &filenames).ok()) {
for (const auto& fname : filenames) { for (const auto& fname : filenames) {
if (ParseFileName(fname, &number, &type) && if (ParseFileName(fname, &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end type == kTableFile) { // Lock file will be deleted at end
std::string table_path = path + "/" + fname; std::string table_path = path + "/" + fname;
Status del = DeleteSSTFile(&soptions, table_path, dbname); Status del = DeleteSSTFile(&soptions, table_path, dbname);
if (result.ok() && !del.ok()) { if (result.ok() && !del.ok()) {
@ -2937,8 +2938,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (env->GetChildren(archivedir, &archiveFiles).ok()) { if (env->GetChildren(archivedir, &archiveFiles).ok()) {
// Delete archival files. // Delete archival files.
for (const auto& file : archiveFiles) { for (const auto& file : archiveFiles) {
if (ParseFileName(file, &number, &type) && if (ParseFileName(file, &number, &type) && type == kLogFile) {
type == kLogFile) {
Status del = env->DeleteFile(archivedir + "/" + file); Status del = env->DeleteFile(archivedir + "/" + file);
if (result.ok() && !del.ok()) { if (result.ok() && !del.ok()) {
result = del; result = del;
@ -3147,7 +3147,7 @@ void DumpRocksDBBuildVersion(Logger* log) {
ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha); ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date); ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
#else #else
(void)log; // ignore "-Wunused-parameter" (void)log; // ignore "-Wunused-parameter"
#endif #endif
} }
@ -3585,8 +3585,8 @@ Status DBImpl::VerifyChecksum() {
Options opts; Options opts;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
opts = Options(BuildDBOptions(immutable_db_options_, opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
mutable_db_options_), cfd->GetLatestCFOptions()); cfd->GetLatestCFOptions());
} }
for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) { for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok(); for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();

@ -178,10 +178,9 @@ class DBImpl : public DB {
virtual bool GetAggregatedIntProperty(const Slice& property, virtual bool GetAggregatedIntProperty(const Slice& property,
uint64_t* aggregated_value) override; uint64_t* aggregated_value) override;
using DB::GetApproximateSizes; using DB::GetApproximateSizes;
virtual void GetApproximateSizes(ColumnFamilyHandle* column_family, virtual void GetApproximateSizes(
const Range* range, int n, uint64_t* sizes, ColumnFamilyHandle* column_family, const Range* range, int n,
uint8_t include_flags uint64_t* sizes, uint8_t include_flags = INCLUDE_FILES) override;
= INCLUDE_FILES) override;
using DB::GetApproximateMemTableStats; using DB::GetApproximateMemTableStats;
virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family, virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
const Range& range, const Range& range,
@ -284,9 +283,8 @@ class DBImpl : public DB {
// Status::NotFound() will be returned if the current DB does not have // Status::NotFound() will be returned if the current DB does not have
// any column family match the specified name. // any column family match the specified name.
// TODO(yhchiang): output parameter is placed in the end in this codebase. // TODO(yhchiang): output parameter is placed in the end in this codebase.
virtual void GetColumnFamilyMetaData( virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
ColumnFamilyHandle* column_family, ColumnFamilyMetaData* metadata) override;
ColumnFamilyMetaData* metadata) override;
Status SuggestCompactRange(ColumnFamilyHandle* column_family, Status SuggestCompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override; const Slice* begin, const Slice* end) override;
@ -378,9 +376,8 @@ class DBImpl : public DB {
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id, int output_level, uint32_t output_path_id,
uint32_t max_subcompactions, uint32_t max_subcompactions, const Slice* begin,
const Slice* begin, const Slice* end, const Slice* end, bool exclusive,
bool exclusive,
bool disallow_trivial_move = false); bool disallow_trivial_move = false);
// Return an internal iterator over the current state of the database. // Return an internal iterator over the current state of the database.
@ -427,8 +424,8 @@ class DBImpl : public DB {
// Return the maximum overlapping data (in bytes) at next level for any // Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1. // file at a level >= 1.
int64_t TEST_MaxNextLevelOverlappingBytes(ColumnFamilyHandle* column_family = int64_t TEST_MaxNextLevelOverlappingBytes(
nullptr); ColumnFamilyHandle* column_family = nullptr);
// Return the current manifest file no. // Return the current manifest file no.
uint64_t TEST_Current_Manifest_FileNo(); uint64_t TEST_Current_Manifest_FileNo();
@ -801,13 +798,12 @@ class DBImpl : public DB {
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop); int job_id, TableProperties prop);
void NotifyOnCompactionBegin(ColumnFamilyData* cfd, void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
Compaction *c, const Status &st, const Status& st,
const CompactionJobStats& job_stats, const CompactionJobStats& job_stats, int job_id);
int job_id);
void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c,
Compaction *c, const Status &st, const Status& st,
const CompactionJobStats& job_stats, const CompactionJobStats& job_stats,
int job_id); int job_id);
void NotifyOnMemTableSealed(ColumnFamilyData* cfd, void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
@ -1197,7 +1193,8 @@ class DBImpl : public DB {
// Return the minimum empty level that could hold the total data in the // Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found. // input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level); const MutableCFOptions& mutable_cf_options,
int level);
// Move the files in the input level to the target level. // Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could // If target_level < 0, automatically calculate the minimum level that could
@ -1274,8 +1271,7 @@ class DBImpl : public DB {
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_; std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize { struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
: number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; } void AddSize(uint64_t new_size) { size += new_size; }
uint64_t number; uint64_t number;
uint64_t size = 0; uint64_t size = 0;
@ -1487,15 +1483,15 @@ class DBImpl : public DB {
uint32_t output_path_id; uint32_t output_path_id;
Status status; Status status;
bool done; bool done;
bool in_progress; // compaction request being processed? bool in_progress; // compaction request being processed?
bool incomplete; // only part of requested range compacted bool incomplete; // only part of requested range compacted
bool exclusive; // current behavior of only one manual bool exclusive; // current behavior of only one manual
bool disallow_trivial_move; // Force actual compaction to run bool disallow_trivial_move; // Force actual compaction to run
const InternalKey* begin; // nullptr means beginning of key range const InternalKey* begin; // nullptr means beginning of key range
const InternalKey* end; // nullptr means end of key range const InternalKey* end; // nullptr means end of key range
InternalKey* manual_end; // how far we are compacting InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress InternalKey tmp_storage1; // Used to keep track of compaction progress
}; };
struct PrepickedCompaction { struct PrepickedCompaction {
// background compaction takes ownership of `compaction`. // background compaction takes ownership of `compaction`.
@ -1619,9 +1615,9 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables; using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, virtual Status GetPropertiesOfAllTables(
TablePropertiesCollection* props) ColumnFamilyHandle* column_family,
override; TablePropertiesCollection* props) override;
virtual Status GetPropertiesOfTablesInRange( virtual Status GetPropertiesOfTablesInRange(
ColumnFamilyHandle* column_family, const Range* range, std::size_t n, ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
TablePropertiesCollection* props) override; TablePropertiesCollection* props) override;
@ -1659,9 +1655,7 @@ class DBImpl : public DB {
void MarkAsGrabbedForPurge(uint64_t file_number); void MarkAsGrabbedForPurge(uint64_t file_number);
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
Env::WriteLifeTimeHint CalculateWALWriteHint() { Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
return Env::WLTH_SHORT;
}
// When set, we use a separate queue for writes that dont write to memtable. // When set, we use a separate queue for writes that dont write to memtable.
// In 2PC these are the writes at Prepare phase. // In 2PC these are the writes at Prepare phase.
@ -1723,8 +1717,7 @@ class DBImpl : public DB {
InstrumentedCondVar atomic_flush_install_cv_; InstrumentedCondVar atomic_flush_install_cv_;
}; };
extern Options SanitizeOptions(const std::string& db, extern Options SanitizeOptions(const std::string& db, const Options& src);
const Options& src);
extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src); extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);

@ -141,7 +141,6 @@ Status DBImpl::FlushMemTableToOutputFile(
assert(cfd->imm()->NumNotFlushed() != 0); assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending()); assert(cfd->imm()->IsFlushPending());
FlushJob flush_job( FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options, dbname_, cfd, immutable_db_options_, mutable_cf_options,
nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(), nullptr /* memtable_id */, env_options_for_compaction_, versions_.get(),
@ -150,8 +149,7 @@ Status DBImpl::FlushMemTableToOutputFile(
GetDataDir(cfd, 0U), GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats, &event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, true /* sync_output_directory */, true /* write_manifest */, thread_pri);
thread_pri);
FileMetaData file_meta; FileMetaData file_meta;
@ -220,7 +218,8 @@ Status DBImpl::FlushMemTableToOutputFile(
cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber()); cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
sfm->OnAddFile(file_path); sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached()) { if (sfm->IsMaxAllowedSpaceReached()) {
Status new_bg_error = Status::SpaceLimit("Max allowed space was reached"); Status new_bg_error =
Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
&new_bg_error); &new_bg_error);
@ -236,9 +235,8 @@ Status DBImpl::FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) { JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
if (immutable_db_options_.atomic_flush) { if (immutable_db_options_.atomic_flush) {
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress, return AtomicFlushMemTablesToOutputFiles(
job_context, log_buffer, bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
thread_pri);
} }
std::vector<SequenceNumber> snapshot_seqs; std::vector<SequenceNumber> snapshot_seqs;
SequenceNumber earliest_write_conflict_snapshot; SequenceNumber earliest_write_conflict_snapshot;
@ -742,7 +740,8 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
} }
} }
s = RunManualCompaction(cfd, level, output_level, options.target_path_id, s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
options.max_subcompactions, begin, end, exclusive); options.max_subcompactions, begin, end,
exclusive);
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
@ -1065,8 +1064,8 @@ Status DBImpl::ContinueBackgroundWork() {
return Status::OK(); return Status::OK();
} }
void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
Compaction *c, const Status &st, const Status& st,
const CompactionJobStats& job_stats, const CompactionJobStats& job_stats,
int job_id) { int job_id) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -1626,7 +1625,7 @@ Status DBImpl::AtomicFlushMemTables(
// it against various constrains and delays flush if it'd cause write stall. // it against various constrains and delays flush if it'd cause write stall.
// Called should check status and flush_needed to see if flush already happened. // Called should check status and flush_needed to see if flush already happened.
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed) { bool* flush_needed) {
{ {
*flush_needed = true; *flush_needed = true;
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
@ -1774,7 +1773,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// we paused the background work // we paused the background work
return; return;
} else if (error_handler_.IsBGWorkStopped() && } else if (error_handler_.IsBGWorkStopped() &&
!error_handler_.IsRecoveryInProgress()) { !error_handler_.IsRecoveryInProgress()) {
// There has been a hard error and this call is not part of the recovery // There has been a hard error and this call is not part of the recovery
// sequence. Bail out here so we don't get into an endless loop of // sequence. Bail out here so we don't get into an endless loop of
// scheduling BG work which will again call this function // scheduling BG work which will again call this function
@ -2194,7 +2193,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
if (s.IsBusy()) { if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock(); mutex_.Unlock();
env_->SleepForMicroseconds(10000); // prevent hot loop env_->SleepForMicroseconds(10000); // prevent hot loop
mutex_.Lock(); mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress()) { } else if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in

@ -101,7 +101,7 @@ Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
} }
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall, Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
ColumnFamilyHandle* cfh) { ColumnFamilyHandle* cfh) {
FlushOptions fo; FlushOptions fo;
fo.wait = wait; fo.wait = wait;
fo.allow_write_stall = allow_write_stall; fo.allow_write_stall = allow_write_stall;
@ -143,13 +143,9 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
return error_handler_.GetBGError(); return error_handler_.GetBGError();
} }
void DBImpl::TEST_LockMutex() { void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
mutex_.Lock();
}
void DBImpl::TEST_UnlockMutex() { void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }
mutex_.Unlock();
}
void* DBImpl::TEST_BeginWrite() { void* DBImpl::TEST_BeginWrite() {
auto w = new WriteThread::Writer(); auto w = new WriteThread::Writer();

@ -108,7 +108,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
versions_->AddLiveFiles(&job_context->sst_live); versions_->AddLiveFiles(&job_context->sst_live);
if (doing_the_full_scan) { if (doing_the_full_scan) {
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
dbname_); dbname_);
std::set<std::string> paths; std::set<std::string> paths;
for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size(); for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
path_id++) { path_id++) {
@ -152,8 +152,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
} }
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
job_context->full_scan_candidate_files.emplace_back( job_context->full_scan_candidate_files.emplace_back("/" + file, path);
"/" + file, path);
} }
} }
@ -163,8 +162,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
env_->GetChildren(immutable_db_options_.wal_dir, env_->GetChildren(immutable_db_options_.wal_dir,
&log_files); // Ignore errors &log_files); // Ignore errors
for (const std::string& log_file : log_files) { for (const std::string& log_file : log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, job_context->full_scan_candidate_files.emplace_back(
immutable_db_options_.wal_dir); log_file, immutable_db_options_.wal_dir);
} }
} }
// Add info log files in db_log_dir // Add info log files in db_log_dir
@ -174,8 +173,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// Ignore errors // Ignore errors
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files); env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
for (std::string& log_file : info_log_files) { for (std::string& log_file : info_log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, job_context->full_scan_candidate_files.emplace_back(
immutable_db_options_.db_log_dir); log_file, immutable_db_options_.db_log_dir);
} }
} }
} }
@ -267,7 +266,7 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
file_deletion_status = env_->DeleteFile(fname); file_deletion_status = env_->DeleteFile(fname);
} }
TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
&file_deletion_status); &file_deletion_status);
if (file_deletion_status.ok()) { if (file_deletion_status.ok()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id, "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
@ -322,7 +321,8 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
const char* kDumbDbName = ""; const char* kDumbDbName = "";
for (auto& file : state.sst_delete_files) { for (auto& file : state.sst_delete_files) {
candidate_files.emplace_back( candidate_files.emplace_back(
MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()), file.path); MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()),
file.path);
if (file.metadata->table_reader_handle) { if (file.metadata->table_reader_handle) {
table_cache_->Release(file.metadata->table_reader_handle); table_cache_->Release(file.metadata->table_reader_handle);
} }
@ -332,7 +332,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
for (auto file_num : state.log_delete_files) { for (auto file_num : state.log_delete_files) {
if (file_num > 0) { if (file_num > 0) {
candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), candidate_files.emplace_back(LogFileName(kDumbDbName, file_num),
immutable_db_options_.wal_dir); immutable_db_options_.wal_dir);
} }
} }
for (const auto& filename : state.manifest_delete_files) { for (const auto& filename : state.manifest_delete_files) {
@ -465,13 +465,12 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
} else { } else {
dir_to_sync = dir_to_sync =
(type == kLogFile) ? immutable_db_options_.wal_dir : dbname_; (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
fname = dir_to_sync fname = dir_to_sync +
+ ( ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
(!dir_to_sync.empty() && dir_to_sync.back() == '/') || (!to_delete.empty() && to_delete.front() == '/')
(!to_delete.empty() && to_delete.front() == '/') ? ""
? "" : "/" : "/") +
) to_delete;
+ to_delete;
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

@ -7,9 +7,9 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "db/db_impl.h"
#include <vector>
#include <string> #include <string>
#include <vector>
#include "db/db_impl.h"
namespace rocksdb { namespace rocksdb {
@ -122,6 +122,6 @@ class DBImplReadOnly : public DBImpl {
DBImplReadOnly(const DBImplReadOnly&); DBImplReadOnly(const DBImplReadOnly&);
void operator=(const DBImplReadOnly&); void operator=(const DBImplReadOnly&);
}; };
} } // namespace rocksdb
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -868,7 +868,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) { if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false; cached_recoverable_state_empty_ = false;
} }
if (status.ok() && need_log_sync) { if (status.ok() && need_log_sync) {
@ -944,7 +944,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) { if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false; cached_recoverable_state_empty_ = false;
} }
log_write_mutex_.Unlock(); log_write_mutex_.Unlock();
@ -1064,16 +1064,17 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
if (!flush_wont_release_oldest_log) { if (!flush_wont_release_oldest_log) {
// we only mark this log as getting flushed if we have successfully // we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepared // flushed all data in this log. If this log contains outstanding prepared
// transactions then we cannot flush this log until those transactions are commited. // transactions then we cannot flush this log until those transactions are
// commited.
unable_to_release_oldest_log_ = false; unable_to_release_oldest_log_ = false;
alive_log_files_.begin()->getting_flushed = true; alive_log_files_.begin()->getting_flushed = true;
} }
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(
"Flushing all column families with data in WAL number %" PRIu64 immutable_db_options_.info_log,
". Total log size is %" PRIu64 "Flushing all column families with data in WAL number %" PRIu64
" while max_total_wal_size is %" PRIu64, ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize()); oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't // no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread // happen while we're in the write thread
autovector<ColumnFamilyData*> cfds; autovector<ColumnFamilyData*> cfds;
@ -1419,7 +1420,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
DBOptions db_options = DBOptions db_options =
BuildDBOptions(immutable_db_options_, mutable_db_options_); BuildDBOptions(immutable_db_options_, mutable_db_options_);
const auto preallocate_block_size = const auto preallocate_block_size =
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
auto write_hint = CalculateWALWriteHint(); auto write_hint = CalculateWALWriteHint();
mutex_.Unlock(); mutex_.Unlock();
{ {
@ -1461,7 +1462,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
context->superversion_context.NewSuperVersion(); context->superversion_context.NewSuperVersion();
} }
} }
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] New memtable created with log file: #%" PRIu64 "[%s] New memtable created with log file: #%" PRIu64
@ -1554,13 +1554,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const { size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
mutex_.AssertHeld(); mutex_.AssertHeld();
size_t bsize = static_cast<size_t>( size_t bsize =
write_buffer_size / 10 + write_buffer_size); static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
// Some users might set very high write_buffer_size and rely on // Some users might set very high write_buffer_size and rely on
// max_total_wal_size or other parameters to control the WAL size. // max_total_wal_size or other parameters to control the WAL size.
if (mutable_db_options_.max_total_wal_size > 0) { if (mutable_db_options_.max_total_wal_size > 0) {
bsize = std::min<size_t>(bsize, static_cast<size_t>( bsize = std::min<size_t>(
mutable_db_options_.max_total_wal_size)); bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
} }
if (immutable_db_options_.db_write_buffer_size > 0) { if (immutable_db_options_.db_write_buffer_size > 0) {
bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size); bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);

@ -106,9 +106,7 @@ std::string InternalKey::DebugString(bool hex) const {
return result; return result;
} }
const char* InternalKeyComparator::Name() const { const char* InternalKeyComparator::Name() const { return name_.c_str(); }
return name_.c_str();
}
int InternalKeyComparator::Compare(const ParsedInternalKey& a, int InternalKeyComparator::Compare(const ParsedInternalKey& a,
const ParsedInternalKey& b) const { const ParsedInternalKey& b) const {
@ -131,9 +129,8 @@ int InternalKeyComparator::Compare(const ParsedInternalKey& a,
return r; return r;
} }
void InternalKeyComparator::FindShortestSeparator( void InternalKeyComparator::FindShortestSeparator(std::string* start,
std::string* start, const Slice& limit) const {
const Slice& limit) const {
// Attempt to shorten the user portion of the key // Attempt to shorten the user portion of the key
Slice user_start = ExtractUserKey(*start); Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit); Slice user_limit = ExtractUserKey(limit);
@ -143,7 +140,8 @@ void InternalKeyComparator::FindShortestSeparator(
user_comparator_.Compare(user_start, tmp) < 0) { user_comparator_.Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically. // User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key. // Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek)); PutFixed64(&tmp,
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0); assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0); assert(this->Compare(tmp, limit) < 0);
start->swap(tmp); start->swap(tmp);
@ -158,7 +156,8 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
user_comparator_.Compare(user_key, tmp) < 0) { user_comparator_.Compare(user_key, tmp) < 0) {
// User key has become shorter physically, but larger logically. // User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key. // Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek)); PutFixed64(&tmp,
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0); assert(this->Compare(*key, tmp) < 0);
key->swap(tmp); key->swap(tmp);
} }

@ -81,8 +81,7 @@ inline bool IsExtendedValueType(ValueType t) {
// We leave eight bits empty at the bottom so a type and sequence# // We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits. // can be packed together into 64-bits.
static const SequenceNumber kMaxSequenceNumber = static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
((0x1ull << 56) - 1);
static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64; static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
@ -93,9 +92,9 @@ struct ParsedInternalKey {
ParsedInternalKey() ParsedInternalKey()
: sequence(kMaxSequenceNumber) // Make code analyzer happy : sequence(kMaxSequenceNumber) // Make code analyzer happy
{} // Intentionally left uninitialized (for speed) {} // Intentionally left uninitialized (for speed)
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t) ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) { } : user_key(u), sequence(seq), type(t) {}
std::string DebugString(bool hex = false) const; std::string DebugString(bool hex = false) const;
void clear() { void clear() {
@ -163,6 +162,7 @@ class InternalKeyComparator
private: private:
UserComparatorWrapper user_comparator_; UserComparatorWrapper user_comparator_;
std::string name_; std::string name_;
public: public:
explicit InternalKeyComparator(const Comparator* c) explicit InternalKeyComparator(const Comparator* c)
: user_comparator_(c), : user_comparator_(c),
@ -195,8 +195,9 @@ class InternalKeyComparator
class InternalKey { class InternalKey {
private: private:
std::string rep_; std::string rep_;
public: public:
InternalKey() { } // Leave rep_ as empty to indicate it is invalid InternalKey() {} // Leave rep_ as empty to indicate it is invalid
InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) { InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t)); AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t));
} }
@ -253,8 +254,8 @@ class InternalKey {
std::string DebugString(bool hex = false) const; std::string DebugString(bool hex = false) const;
}; };
inline int InternalKeyComparator::Compare( inline int InternalKeyComparator::Compare(const InternalKey& a,
const InternalKey& a, const InternalKey& b) const { const InternalKey& b) const {
return Compare(a.Encode(), b.Encode()); return Compare(a.Encode(), b.Encode());
} }
@ -291,7 +292,6 @@ inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
return num >> 8; return num >> 8;
} }
// A helper class useful for DBImpl::Get() // A helper class useful for DBImpl::Get()
class LookupKey { class LookupKey {
public: public:
@ -327,7 +327,7 @@ class LookupKey {
const char* start_; const char* start_;
const char* kstart_; const char* kstart_;
const char* end_; const char* end_;
char space_[200]; // Avoid allocation for short keys char space_[200]; // Avoid allocation for short keys
// No copying allowed // No copying allowed
LookupKey(const LookupKey&); LookupKey(const LookupKey&);
@ -636,8 +636,8 @@ struct RangeTombstone {
} }
}; };
inline inline int InternalKeyComparator::Compare(const Slice& akey,
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { const Slice& bkey) const {
// Order by: // Order by:
// increasing user key (according to user-supplied comparator) // increasing user key (according to user-supplied comparator)
// decreasing sequence number // decreasing sequence number
@ -655,9 +655,8 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
return r; return r;
} }
inline inline int InternalKeyComparator::CompareKeySeq(const Slice& akey,
int InternalKeyComparator::CompareKeySeq(const Slice& akey, const Slice& bkey) const {
const Slice& bkey) const {
// Order by: // Order by:
// increasing user key (according to user-supplied comparator) // increasing user key (according to user-supplied comparator)
// decreasing sequence number // decreasing sequence number

@ -769,13 +769,13 @@ bool IndexBlockIter::PrefixSeek(const Slice& target, uint32_t* index) {
if (num_blocks == 0) { if (num_blocks == 0) {
current_ = restarts_; current_ = restarts_;
return false; return false;
} else { } else {
return BinaryBlockIndexSeek(seek_key, block_ids, 0, num_blocks - 1, index); return BinaryBlockIndexSeek(seek_key, block_ids, 0, num_blocks - 1, index);
} }
} }
uint32_t Block::NumRestarts() const { uint32_t Block::NumRestarts() const {
assert(size_ >= 2*sizeof(uint32_t)); assert(size_ >= 2 * sizeof(uint32_t));
uint32_t block_footer = DecodeFixed32(data_ + size_ - sizeof(uint32_t)); uint32_t block_footer = DecodeFixed32(data_ + size_ - sizeof(uint32_t));
uint32_t num_restarts = block_footer; uint32_t num_restarts = block_footer;
if (size_ > kMaxBlockSizeSupportedByHashIndex) { if (size_ > kMaxBlockSizeSupportedByHashIndex) {
@ -866,7 +866,7 @@ Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
default: default:
size_ = 0; // Error marker size_ = 0; // Error marker
} }
} }
if (read_amp_bytes_per_bit != 0 && statistics && size_ != 0) { if (read_amp_bytes_per_bit != 0 && statistics && size_ != 0) {
read_amp_bitmap_.reset(new BlockReadAmpBitmap( read_amp_bitmap_.reset(new BlockReadAmpBitmap(
restart_offset_, read_amp_bytes_per_bit, statistics)); restart_offset_, read_amp_bytes_per_bit, statistics));

@ -53,8 +53,8 @@ class BlockReadAmpBitmap {
: bitmap_(nullptr), : bitmap_(nullptr),
bytes_per_bit_pow_(0), bytes_per_bit_pow_(0),
statistics_(statistics), statistics_(statistics),
rnd_( rnd_(Random::GetTLSInstance()->Uniform(
Random::GetTLSInstance()->Uniform(static_cast<int>(bytes_per_bit))) { static_cast<int>(bytes_per_bit))) {
TEST_SYNC_POINT_CALLBACK("BlockReadAmpBitmap:rnd", &rnd_); TEST_SYNC_POINT_CALLBACK("BlockReadAmpBitmap:rnd", &rnd_);
assert(block_size > 0 && bytes_per_bit > 0); assert(block_size > 0 && bytes_per_bit > 0);
@ -64,8 +64,7 @@ class BlockReadAmpBitmap {
} }
// num_bits_needed = ceil(block_size / bytes_per_bit) // num_bits_needed = ceil(block_size / bytes_per_bit)
size_t num_bits_needed = size_t num_bits_needed = ((block_size - 1) >> bytes_per_bit_pow_) + 1;
((block_size - 1) >> bytes_per_bit_pow_) + 1;
assert(num_bits_needed > 0); assert(num_bits_needed > 0);
// bitmap_size = ceil(num_bits_needed / kBitsPerEntry) // bitmap_size = ceil(num_bits_needed / kBitsPerEntry)
@ -204,9 +203,9 @@ class Block {
private: private:
BlockContents contents_; BlockContents contents_;
const char* data_; // contents_.data.data() const char* data_; // contents_.data.data()
size_t size_; // contents_.data.size() size_t size_; // contents_.data.size()
uint32_t restart_offset_; // Offset in data_ of restart array uint32_t restart_offset_; // Offset in data_ of restart array
uint32_t num_restarts_; uint32_t num_restarts_;
std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_; std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_;
// All keys in the block will have seqno = global_seqno_, regardless of // All keys in the block will have seqno = global_seqno_, regardless of
@ -226,8 +225,8 @@ class BlockIter : public InternalIteratorBase<TValue> {
void InitializeBase(const Comparator* comparator, const char* data, void InitializeBase(const Comparator* comparator, const char* data,
uint32_t restarts, uint32_t num_restarts, uint32_t restarts, uint32_t num_restarts,
SequenceNumber global_seqno, bool block_contents_pinned) { SequenceNumber global_seqno, bool block_contents_pinned) {
assert(data_ == nullptr); // Ensure it is called only once assert(data_ == nullptr); // Ensure it is called only once
assert(num_restarts > 0); // Ensure the param is valid assert(num_restarts > 0); // Ensure the param is valid
comparator_ = comparator; comparator_ = comparator;
data_ = data; data_ = data;
@ -295,7 +294,7 @@ class BlockIter : public InternalIteratorBase<TValue> {
// Index of restart block in which current_ or current_-1 falls // Index of restart block in which current_ or current_-1 falls
uint32_t restart_index_; uint32_t restart_index_;
uint32_t restarts_; // Offset of restart array (list of fixed32) uint32_t restarts_; // Offset of restart array (list of fixed32)
// current_ is offset in data_ of current entry. >= restarts_ if !Valid // current_ is offset in data_ of current entry. >= restarts_ if !Valid
uint32_t current_; uint32_t current_;
IterKey key_; IterKey key_;
@ -548,8 +547,7 @@ class IndexBlockIter final : public BlockIter<BlockHandle> {
bool PrefixSeek(const Slice& target, uint32_t* index); bool PrefixSeek(const Slice& target, uint32_t* index);
bool BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids, bool BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids,
uint32_t left, uint32_t right, uint32_t left, uint32_t right, uint32_t* index);
uint32_t* index);
inline int CompareBlockKey(uint32_t block_index, const Slice& target); inline int CompareBlockKey(uint32_t block_index, const Slice& target);
inline int Compare(const Slice& a, const Slice& b) const { inline int Compare(const Slice& a, const Slice& b) const {

@ -53,7 +53,6 @@ void AppendItem(std::string* props, const TKey& key, const std::string& value) {
} }
} // namespace } // namespace
// See doc/table_format.txt for an explanation of the filter block format. // See doc/table_format.txt for an explanation of the filter block format.
// Generate new filter every 2KB of data // Generate new filter every 2KB of data

@ -15,8 +15,8 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <string>
#include <memory> #include <memory>
#include <string>
#include <vector> #include <vector>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
@ -26,7 +26,6 @@
namespace rocksdb { namespace rocksdb {
// A BlockBasedFilterBlockBuilder is used to construct all of the filters for a // A BlockBasedFilterBlockBuilder is used to construct all of the filters for a
// particular Table. It generates a single string which is stored as // particular Table. It generates a single string which is stored as
// a special block in the Table. // a special block in the Table.
@ -36,7 +35,7 @@ namespace rocksdb {
class BlockBasedFilterBlockBuilder : public FilterBlockBuilder { class BlockBasedFilterBlockBuilder : public FilterBlockBuilder {
public: public:
BlockBasedFilterBlockBuilder(const SliceTransform* prefix_extractor, BlockBasedFilterBlockBuilder(const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt); const BlockBasedTableOptions& table_opt);
virtual bool IsBlockBased() override { return true; } virtual bool IsBlockBased() override { return true; }
virtual void StartBlock(uint64_t block_offset) override; virtual void StartBlock(uint64_t block_offset) override;
@ -66,7 +65,7 @@ class BlockBasedFilterBlockBuilder : public FilterBlockBuilder {
std::string result_; // Filter data computed so far std::string result_; // Filter data computed so far
std::vector<Slice> tmp_entries_; // policy_->CreateFilter() argument std::vector<Slice> tmp_entries_; // policy_->CreateFilter() argument
std::vector<uint32_t> filter_offsets_; std::vector<uint32_t> filter_offsets_;
size_t num_added_; // Number of keys added size_t num_added_; // Number of keys added
// No copying allowed // No copying allowed
BlockBasedFilterBlockBuilder(const BlockBasedFilterBlockBuilder&); BlockBasedFilterBlockBuilder(const BlockBasedFilterBlockBuilder&);

@ -148,8 +148,8 @@ class BlockBasedFilterBlockTest : public testing::Test {
}; };
TEST_F(BlockBasedFilterBlockTest, BlockBasedEmptyBuilder) { TEST_F(BlockBasedFilterBlockTest, BlockBasedEmptyBuilder) {
FilterBlockBuilder* builder = new BlockBasedFilterBlockBuilder( FilterBlockBuilder* builder =
nullptr, table_options_); new BlockBasedFilterBlockBuilder(nullptr, table_options_);
BlockContents block(builder->Finish()); BlockContents block(builder->Finish());
ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block.data)); ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block.data));
FilterBlockReader* reader = new BlockBasedFilterBlockReader( FilterBlockReader* reader = new BlockBasedFilterBlockReader(
@ -162,8 +162,8 @@ TEST_F(BlockBasedFilterBlockTest, BlockBasedEmptyBuilder) {
} }
TEST_F(BlockBasedFilterBlockTest, BlockBasedSingleChunk) { TEST_F(BlockBasedFilterBlockTest, BlockBasedSingleChunk) {
FilterBlockBuilder* builder = new BlockBasedFilterBlockBuilder( FilterBlockBuilder* builder =
nullptr, table_options_); new BlockBasedFilterBlockBuilder(nullptr, table_options_);
builder->StartBlock(100); builder->StartBlock(100);
builder->Add("foo"); builder->Add("foo");
builder->Add("bar"); builder->Add("bar");
@ -188,8 +188,8 @@ TEST_F(BlockBasedFilterBlockTest, BlockBasedSingleChunk) {
} }
TEST_F(BlockBasedFilterBlockTest, BlockBasedMultiChunk) { TEST_F(BlockBasedFilterBlockTest, BlockBasedMultiChunk) {
FilterBlockBuilder* builder = new BlockBasedFilterBlockBuilder( FilterBlockBuilder* builder =
nullptr, table_options_); new BlockBasedFilterBlockBuilder(nullptr, table_options_);
// First filter // First filter
builder->StartBlock(0); builder->StartBlock(0);

@ -80,9 +80,11 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
// until index builder actully cuts the partition, we take the lower bound // until index builder actully cuts the partition, we take the lower bound
// as partition size. // as partition size.
assert(table_opt.block_size_deviation <= 100); assert(table_opt.block_size_deviation <= 100);
auto partition_size = static_cast<uint32_t>( auto partition_size =
((table_opt.metadata_block_size * static_cast<uint32_t>(((table_opt.metadata_block_size *
(100 - table_opt.block_size_deviation)) + 99) / 100); (100 - table_opt.block_size_deviation)) +
99) /
100);
partition_size = std::max(partition_size, static_cast<uint32_t>(1)); partition_size = std::max(partition_size, static_cast<uint32_t>(1));
return new PartitionedFilterBlockBuilder( return new PartitionedFilterBlockBuilder(
mopt.prefix_extractor.get(), table_opt.whole_key_filtering, mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
@ -596,8 +598,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
Slice block_contents; Slice block_contents;
bool abort_compression = false; bool abort_compression = false;
StopWatchNano timer(r->ioptions.env, StopWatchNano timer(
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); r->ioptions.env,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (r->state == Rep::State::kBuffered) { if (r->state == Rep::State::kBuffered) {
assert(is_data_block); assert(is_data_block);
@ -736,11 +739,12 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
XXH64_state_t* const state = XXH64_createState(); XXH64_state_t* const state = XXH64_createState();
XXH64_reset(state, 0); XXH64_reset(state, 0);
XXH64_update(state, block_contents.data(), XXH64_update(state, block_contents.data(),
static_cast<uint32_t>(block_contents.size())); static_cast<uint32_t>(block_contents.size()));
XXH64_update(state, trailer, 1); // Extend to cover block type XXH64_update(state, trailer, 1); // Extend to cover block type
EncodeFixed32(trailer_without_type, EncodeFixed32(
static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits trailer_without_type,
uint64_t{0xffffffff})); static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
uint64_t{0xffffffff}));
XXH64_freeState(state); XXH64_freeState(state);
break; break;
} }
@ -770,9 +774,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
} }
} }
Status BlockBasedTableBuilder::status() const { Status BlockBasedTableBuilder::status() const { return rep_->status; }
return rep_->status;
}
static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) { static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
BlockContents* bc = reinterpret_cast<BlockContents*>(value); BlockContents* bc = reinterpret_cast<BlockContents*>(value);
@ -789,7 +791,6 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
Cache* block_cache_compressed = r->table_options.block_cache_compressed.get(); Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
if (type != kNoCompression && block_cache_compressed != nullptr) { if (type != kNoCompression && block_cache_compressed != nullptr) {
size_t size = block_contents.size(); size_t size = block_contents.size();
auto ubuf = auto ubuf =
@ -805,11 +806,10 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
// make cache key by appending the file offset to the cache prefix id // make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64( char* end = EncodeVarint64(
r->compressed_cache_key_prefix + r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
r->compressed_cache_key_prefix_size, handle->offset());
handle->offset()); Slice key(r->compressed_cache_key_prefix,
Slice key(r->compressed_cache_key_prefix, static_cast<size_t> static_cast<size_t>(end - r->compressed_cache_key_prefix));
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache. // Insert into compressed block cache.
block_cache_compressed->Insert( block_cache_compressed->Insert(

@ -154,8 +154,7 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix,
} }
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
int level, int level, Tickers block_cache_miss_ticker,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker, Tickers block_cache_hit_ticker,
uint64_t* block_cache_miss_stats, uint64_t* block_cache_miss_stats,
uint64_t* block_cache_hit_stats, uint64_t* block_cache_hit_stats,
@ -165,7 +164,7 @@ Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
if (cache_handle != nullptr) { if (cache_handle != nullptr) {
PERF_COUNTER_ADD(block_cache_hit_count, 1); PERF_COUNTER_ADD(block_cache_hit_count, 1);
PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1, PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1,
static_cast<uint32_t>(level)); static_cast<uint32_t>(level));
if (get_context != nullptr) { if (get_context != nullptr) {
// overall cache hit // overall cache hit
get_context->get_context_stats_.num_cache_hit++; get_context->get_context_stats_.num_cache_hit++;
@ -184,7 +183,7 @@ Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
} }
} else { } else {
PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1, PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1,
static_cast<uint32_t>(level)); static_cast<uint32_t>(level));
if (get_context != nullptr) { if (get_context != nullptr) {
// overall cache miss // overall cache miss
get_context->get_context_stats_.num_cache_miss++; get_context->get_context_stats_.num_cache_miss++;
@ -636,9 +635,8 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) {
} }
} }
void BlockBasedTable::GenerateCachePrefix(Cache* cc, void BlockBasedTable::GenerateCachePrefix(Cache* cc, RandomAccessFile* file,
RandomAccessFile* file, char* buffer, size_t* size) { char* buffer, size_t* size) {
// generate an id from the file // generate an id from the file
*size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
@ -650,9 +648,8 @@ void BlockBasedTable::GenerateCachePrefix(Cache* cc,
} }
} }
void BlockBasedTable::GenerateCachePrefix(Cache* cc, void BlockBasedTable::GenerateCachePrefix(Cache* cc, WritableFile* file,
WritableFile* file, char* buffer, size_t* size) { char* buffer, size_t* size) {
// generate an id from the file // generate an id from the file
*size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize); *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
@ -1706,8 +1703,8 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
Statistics* statistics = rep_->ioptions.statistics; Statistics* statistics = rep_->ioptions.statistics;
auto cache_handle = GetEntryFromCache( auto cache_handle = GetEntryFromCache(
block_cache, key, rep_->level, block_cache, key, rep_->level, BLOCK_CACHE_FILTER_MISS,
BLOCK_CACHE_FILTER_MISS, BLOCK_CACHE_FILTER_HIT, BLOCK_CACHE_FILTER_HIT,
get_context ? &get_context->get_context_stats_.num_cache_filter_miss get_context ? &get_context->get_context_stats_.num_cache_filter_miss
: nullptr, : nullptr,
get_context ? &get_context->get_context_stats_.num_cache_filter_hit get_context ? &get_context->get_context_stats_.num_cache_filter_hit
@ -1717,8 +1714,8 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
FilterBlockReader* filter = nullptr; FilterBlockReader* filter = nullptr;
if (cache_handle != nullptr) { if (cache_handle != nullptr) {
PERF_COUNTER_ADD(block_cache_filter_hit_count, 1); PERF_COUNTER_ADD(block_cache_filter_hit_count, 1);
filter = reinterpret_cast<FilterBlockReader*>( filter =
block_cache->Value(cache_handle)); reinterpret_cast<FilterBlockReader*>(block_cache->Value(cache_handle));
} else if (no_io) { } else if (no_io) {
// Do not invoke any io. // Do not invoke any io.
return CachableEntry<FilterBlockReader>(); return CachableEntry<FilterBlockReader>();
@ -1754,7 +1751,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
} }
} }
return { filter, cache_handle }; return {filter, cache_handle};
} }
BlockBasedTable::CachableEntry<UncompressionDict> BlockBasedTable::CachableEntry<UncompressionDict>
@ -1868,8 +1865,8 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
rep_->dummy_index_reader_offset, cache_key); rep_->dummy_index_reader_offset, cache_key);
Statistics* statistics = rep_->ioptions.statistics; Statistics* statistics = rep_->ioptions.statistics;
auto cache_handle = GetEntryFromCache( auto cache_handle = GetEntryFromCache(
block_cache, key, rep_->level, block_cache, key, rep_->level, BLOCK_CACHE_INDEX_MISS,
BLOCK_CACHE_INDEX_MISS, BLOCK_CACHE_INDEX_HIT, BLOCK_CACHE_INDEX_HIT,
get_context ? &get_context->get_context_stats_.num_cache_index_miss get_context ? &get_context->get_context_stats_.num_cache_index_miss
: nullptr, : nullptr,
get_context ? &get_context->get_context_stats_.num_cache_index_hit get_context ? &get_context->get_context_stats_.num_cache_index_hit
@ -1934,7 +1931,6 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
return NewErrorInternalIterator<BlockHandle>(s); return NewErrorInternalIterator<BlockHandle>(s);
} }
} }
} }
assert(cache_handle); assert(cache_handle);

@ -396,10 +396,10 @@ class BlockBasedTable : public TableReader {
static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size); static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size);
// Generate a cache key prefix from the file // Generate a cache key prefix from the file
static void GenerateCachePrefix(Cache* cc, static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file,
RandomAccessFile* file, char* buffer, size_t* size); char* buffer, size_t* size);
static void GenerateCachePrefix(Cache* cc, static void GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer,
WritableFile* file, char* buffer, size_t* size); size_t* size);
// Helper functions for DumpTable() // Helper functions for DumpTable()
Status DumpIndexBlock(WritableFile* out_file); Status DumpIndexBlock(WritableFile* out_file);

@ -64,14 +64,14 @@ BlockBuilder::BlockBuilder(
assert(0); assert(0);
} }
assert(block_restart_interval_ >= 1); assert(block_restart_interval_ >= 1);
restarts_.push_back(0); // First restart point is at offset 0 restarts_.push_back(0); // First restart point is at offset 0
estimate_ = sizeof(uint32_t) + sizeof(uint32_t); estimate_ = sizeof(uint32_t) + sizeof(uint32_t);
} }
void BlockBuilder::Reset() { void BlockBuilder::Reset() {
buffer_.clear(); buffer_.clear();
restarts_.clear(); restarts_.clear();
restarts_.push_back(0); // First restart point is at offset 0 restarts_.push_back(0); // First restart point is at offset 0
estimate_ = sizeof(uint32_t) + sizeof(uint32_t); estimate_ = sizeof(uint32_t) + sizeof(uint32_t);
counter_ = 0; counter_ = 0;
finished_ = false; finished_ = false;
@ -81,8 +81,8 @@ void BlockBuilder::Reset() {
} }
} }
size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, const Slice& value) size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key,
const { const Slice& value) const {
size_t estimate = CurrentSizeEstimate(); size_t estimate = CurrentSizeEstimate();
// Note: this is an imprecise estimate as it accounts for the whole key size // Note: this is an imprecise estimate as it accounts for the whole key size
// instead of non-shared key size. // instead of non-shared key size.
@ -95,13 +95,13 @@ size_t BlockBuilder::EstimateSizeAfterKV(const Slice& key, const Slice& value)
: value.size() / 2; : value.size() / 2;
if (counter_ >= block_restart_interval_) { if (counter_ >= block_restart_interval_) {
estimate += sizeof(uint32_t); // a new restart entry. estimate += sizeof(uint32_t); // a new restart entry.
} }
estimate += sizeof(int32_t); // varint for shared prefix length. estimate += sizeof(int32_t); // varint for shared prefix length.
// Note: this is an imprecise estimate as we will have to encoded size, one // Note: this is an imprecise estimate as we will have to encoded size, one
// for shared key and one for non-shared key. // for shared key and one for non-shared key.
estimate += VarintLength(key.size()); // varint for key length. estimate += VarintLength(key.size()); // varint for key length.
if (!use_value_delta_encoding_ || (counter_ >= block_restart_interval_)) { if (!use_value_delta_encoding_ || (counter_ >= block_restart_interval_)) {
estimate += VarintLength(value.size()); // varint for value length. estimate += VarintLength(value.size()); // varint for value length.
} }

@ -54,23 +54,21 @@ class BlockBuilder {
size_t EstimateSizeAfterKV(const Slice& key, const Slice& value) const; size_t EstimateSizeAfterKV(const Slice& key, const Slice& value) const;
// Return true iff no entries have been added since the last Reset() // Return true iff no entries have been added since the last Reset()
bool empty() const { bool empty() const { return buffer_.empty(); }
return buffer_.empty();
}
private: private:
const int block_restart_interval_; const int block_restart_interval_;
// TODO(myabandeh): put it into a separate IndexBlockBuilder // TODO(myabandeh): put it into a separate IndexBlockBuilder
const bool use_delta_encoding_; const bool use_delta_encoding_;
// Refer to BlockIter::DecodeCurrentValue for format of delta encoded values // Refer to BlockIter::DecodeCurrentValue for format of delta encoded values
const bool use_value_delta_encoding_; const bool use_value_delta_encoding_;
std::string buffer_; // Destination buffer std::string buffer_; // Destination buffer
std::vector<uint32_t> restarts_; // Restart points std::vector<uint32_t> restarts_; // Restart points
size_t estimate_; size_t estimate_;
int counter_; // Number of entries emitted since restart int counter_; // Number of entries emitted since restart
bool finished_; // Has Finish() been called? bool finished_; // Has Finish() been called?
std::string last_key_; std::string last_key_;
DataBlockHashIndexBuilder data_block_hash_index_builder_; DataBlockHashIndexBuilder data_block_hash_index_builder_;
}; };

@ -9,8 +9,8 @@
#include "table/block_fetcher.h" #include "table/block_fetcher.h"
#include <string>
#include <inttypes.h> #include <inttypes.h>
#include <string>
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
@ -31,8 +31,7 @@
namespace rocksdb { namespace rocksdb {
inline inline void BlockFetcher::CheckBlockChecksum() {
void BlockFetcher::CheckBlockChecksum() {
// Check the crc of the type and the block contents // Check the crc of the type and the block contents
if (read_options_.verify_checksums) { if (read_options_.verify_checksums) {
const char* data = slice_.data(); // Pointer to where Read put the data const char* data = slice_.data(); // Pointer to where Read put the data
@ -50,10 +49,9 @@ void BlockFetcher::CheckBlockChecksum() {
actual = XXH32(data, static_cast<int>(block_size_) + 1, 0); actual = XXH32(data, static_cast<int>(block_size_) + 1, 0);
break; break;
case kxxHash64: case kxxHash64:
actual =static_cast<uint32_t> ( actual = static_cast<uint32_t>(
XXH64(data, static_cast<int>(block_size_) + 1, 0) & XXH64(data, static_cast<int>(block_size_) + 1, 0) &
uint64_t{0xffffffff} uint64_t{0xffffffff});
);
break; break;
default: default:
status_ = Status::Corruption( status_ = Status::Corruption(
@ -70,8 +68,7 @@ void BlockFetcher::CheckBlockChecksum() {
} }
} }
inline inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
if (cache_options_.persistent_cache && if (cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) { !cache_options_.persistent_cache->IsCompressed()) {
Status status = PersistentCacheHelper::LookupUncompressedPage( Status status = PersistentCacheHelper::LookupUncompressedPage(
@ -92,8 +89,7 @@ bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
return false; return false;
} }
inline inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
bool BlockFetcher::TryGetFromPrefetchBuffer() {
if (prefetch_buffer_ != nullptr && if (prefetch_buffer_ != nullptr &&
prefetch_buffer_->TryReadFromCache( prefetch_buffer_->TryReadFromCache(
handle_.offset(), handle_.offset(),
@ -109,8 +105,7 @@ bool BlockFetcher::TryGetFromPrefetchBuffer() {
return got_from_prefetch_buffer_; return got_from_prefetch_buffer_;
} }
inline inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
if (cache_options_.persistent_cache && if (cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) { cache_options_.persistent_cache->IsCompressed()) {
// lookup uncompressed cache mode p-cache // lookup uncompressed cache mode p-cache
@ -132,8 +127,7 @@ bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() {
return false; return false;
} }
inline inline void BlockFetcher::PrepareBufferForBlockFromFile() {
void BlockFetcher::PrepareBufferForBlockFromFile() {
// cache miss read from device // cache miss read from device
if (do_uncompress_ && if (do_uncompress_ &&
block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) { block_size_ + kBlockTrailerSize < kDefaultStackBufferSize) {
@ -151,8 +145,7 @@ void BlockFetcher::PrepareBufferForBlockFromFile() {
} }
} }
inline inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && read_options_.fill_cache && if (status_.ok() && read_options_.fill_cache &&
cache_options_.persistent_cache && cache_options_.persistent_cache &&
cache_options_.persistent_cache->IsCompressed()) { cache_options_.persistent_cache->IsCompressed()) {
@ -162,8 +155,7 @@ void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
} }
} }
inline inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache && if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache &&
cache_options_.persistent_cache && cache_options_.persistent_cache &&
!cache_options_.persistent_cache->IsCompressed()) { !cache_options_.persistent_cache->IsCompressed()) {
@ -179,8 +171,7 @@ inline void BlockFetcher::CopyBufferToHeap() {
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize); memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
} }
inline inline void BlockFetcher::GetBlockContents() {
void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) { if (slice_.data() != used_buf_) {
// the slice content is not the buffer provided // the slice content is not the buffer provided
*contents_ = BlockContents(Slice(slice_.data(), block_size_)); *contents_ = BlockContents(Slice(slice_.data(), block_size_));

@ -41,9 +41,7 @@ inline uint32_t PrefixToBucket(const Slice& prefix, uint32_t num_buckets) {
const uint32_t kNoneBlock = 0x7FFFFFFF; const uint32_t kNoneBlock = 0x7FFFFFFF;
const uint32_t kBlockArrayMask = 0x80000000; const uint32_t kBlockArrayMask = 0x80000000;
inline bool IsNone(uint32_t block_id) { inline bool IsNone(uint32_t block_id) { return block_id == kNoneBlock; }
return block_id == kNoneBlock;
}
inline bool IsBlockId(uint32_t block_id) { inline bool IsBlockId(uint32_t block_id) {
return (block_id & kBlockArrayMask) == 0; return (block_id & kBlockArrayMask) == 0;
@ -74,10 +72,9 @@ class BlockPrefixIndex::Builder {
explicit Builder(const SliceTransform* internal_prefix_extractor) explicit Builder(const SliceTransform* internal_prefix_extractor)
: internal_prefix_extractor_(internal_prefix_extractor) {} : internal_prefix_extractor_(internal_prefix_extractor) {}
void Add(const Slice& key_prefix, uint32_t start_block, void Add(const Slice& key_prefix, uint32_t start_block, uint32_t num_blocks) {
uint32_t num_blocks) {
PrefixRecord* record = reinterpret_cast<PrefixRecord*>( PrefixRecord* record = reinterpret_cast<PrefixRecord*>(
arena_.AllocateAligned(sizeof(PrefixRecord))); arena_.AllocateAligned(sizeof(PrefixRecord)));
record->prefix = key_prefix; record->prefix = key_prefix;
record->start_block = start_block; record->start_block = start_block;
record->end_block = start_block + num_blocks - 1; record->end_block = start_block + num_blocks - 1;
@ -169,7 +166,6 @@ class BlockPrefixIndex::Builder {
Arena arena_; Arena arena_;
}; };
Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor, Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor,
const Slice& prefixes, const Slice& prefix_meta, const Slice& prefixes, const Slice& prefix_meta,
BlockPrefixIndex** prefix_index) { BlockPrefixIndex** prefix_index) {
@ -191,7 +187,7 @@ Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor,
} }
if (pos + prefix_size > prefixes.size()) { if (pos + prefix_size > prefixes.size()) {
s = Status::Corruption( s = Status::Corruption(
"Corrupted prefix meta block: size inconsistency."); "Corrupted prefix meta block: size inconsistency.");
break; break;
} }
Slice prefix(prefixes.data() + pos, prefix_size); Slice prefix(prefixes.data() + pos, prefix_size);
@ -211,8 +207,7 @@ Status BlockPrefixIndex::Create(const SliceTransform* internal_prefix_extractor,
return s; return s;
} }
uint32_t BlockPrefixIndex::GetBlocks(const Slice& key, uint32_t BlockPrefixIndex::GetBlocks(const Slice& key, uint32_t** blocks) {
uint32_t** blocks) {
Slice prefix = internal_prefix_extractor_->Transform(key); Slice prefix = internal_prefix_extractor_->Transform(key);
uint32_t bucket = PrefixToBucket(prefix, num_buckets_); uint32_t bucket = PrefixToBucket(prefix, num_buckets_);
@ -226,7 +221,7 @@ uint32_t BlockPrefixIndex::GetBlocks(const Slice& key,
} else { } else {
uint32_t index = DecodeIndex(block_id); uint32_t index = DecodeIndex(block_id);
assert(index < num_block_array_buffer_entries_); assert(index < num_block_array_buffer_entries_);
*blocks = &block_array_buffer_[index+1]; *blocks = &block_array_buffer_[index + 1];
uint32_t num_blocks = block_array_buffer_[index]; uint32_t num_blocks = block_array_buffer_[index];
assert(num_blocks > 1); assert(num_blocks > 1);
assert(index + num_blocks < num_block_array_buffer_entries_); assert(index + num_blocks < num_block_array_buffer_entries_);

@ -19,7 +19,6 @@ class SliceTransform;
// that index block. // that index block.
class BlockPrefixIndex { class BlockPrefixIndex {
public: public:
// Maps a key to a list of data blocks that could potentially contain // Maps a key to a list of data blocks that could potentially contain
// the key, based on the prefix. // the key, based on the prefix.
// Returns the total number of relevant blocks, 0 means the key does // Returns the total number of relevant blocks, 0 means the key does
@ -28,7 +27,7 @@ class BlockPrefixIndex {
size_t ApproximateMemoryUsage() const { size_t ApproximateMemoryUsage() const {
return sizeof(BlockPrefixIndex) + return sizeof(BlockPrefixIndex) +
(num_block_array_buffer_entries_ + num_buckets_) * sizeof(uint32_t); (num_block_array_buffer_entries_ + num_buckets_) * sizeof(uint32_t);
} }
// Create hash index by reading from the metadata blocks. // Create hash index by reading from the metadata blocks.
@ -48,8 +47,7 @@ class BlockPrefixIndex {
friend Builder; friend Builder;
BlockPrefixIndex(const SliceTransform* internal_prefix_extractor, BlockPrefixIndex(const SliceTransform* internal_prefix_extractor,
uint32_t num_buckets, uint32_t num_buckets, uint32_t* buckets,
uint32_t* buckets,
uint32_t num_block_array_buffer_entries, uint32_t num_block_array_buffer_entries,
uint32_t* block_array_buffer) uint32_t* block_array_buffer)
: internal_prefix_extractor_(internal_prefix_extractor), : internal_prefix_extractor_(internal_prefix_extractor),

@ -12,13 +12,13 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/write_batch_internal.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/table.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "table/block.h" #include "table/block.h"
#include "table/block_builder.h" #include "table/block_builder.h"
#include "table/format.h" #include "table/format.h"
@ -28,7 +28,7 @@
namespace rocksdb { namespace rocksdb {
static std::string RandomString(Random* rnd, int len) { static std::string RandomString(Random *rnd, int len) {
std::string r; std::string r;
test::RandomString(rnd, len, &r); test::RandomString(rnd, len, &r);
return r; return r;
@ -123,8 +123,7 @@ TEST_F(BlockTest, SimpleTest) {
int count = 0; int count = 0;
InternalIterator *iter = InternalIterator *iter =
reader.NewIterator<DataBlockIter>(options.comparator, options.comparator); reader.NewIterator<DataBlockIter>(options.comparator, options.comparator);
for (iter->SeekToFirst();iter->Valid(); count++, iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); count++, iter->Next()) {
// read kv from block // read kv from block
Slice k = iter->key(); Slice k = iter->key();
Slice v = iter->value(); Slice v = iter->value();
@ -139,7 +138,6 @@ TEST_F(BlockTest, SimpleTest) {
iter = iter =
reader.NewIterator<DataBlockIter>(options.comparator, options.comparator); reader.NewIterator<DataBlockIter>(options.comparator, options.comparator);
for (int i = 0; i < num_records; i++) { for (int i = 0; i < num_records; i++) {
// find a random key in the lookaside array // find a random key in the lookaside array
int index = rnd.Uniform(num_records); int index = rnd.Uniform(num_records);
Slice k(keys[index]); Slice k(keys[index]);
@ -375,9 +373,9 @@ class BlockReadAmpBitmapSlowAndAccurate {
TEST_F(BlockTest, BlockReadAmpBitmap) { TEST_F(BlockTest, BlockReadAmpBitmap) {
uint32_t pin_offset = 0; uint32_t pin_offset = 0;
SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack(
"BlockReadAmpBitmap:rnd", [&pin_offset](void* arg) { "BlockReadAmpBitmap:rnd", [&pin_offset](void *arg) {
pin_offset = *(static_cast<uint32_t*>(arg)); pin_offset = *(static_cast<uint32_t *>(arg));
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
std::vector<size_t> block_sizes = { std::vector<size_t> block_sizes = {
1, // 1 byte 1, // 1 byte
@ -443,11 +441,11 @@ TEST_F(BlockTest, BlockReadAmpBitmap) {
size_t total_bits = 0; size_t total_bits = 0;
for (size_t bit_idx = 0; bit_idx < needed_bits; bit_idx++) { for (size_t bit_idx = 0; bit_idx < needed_bits; bit_idx++) {
total_bits += read_amp_slow_and_accurate.IsPinMarked( total_bits += read_amp_slow_and_accurate.IsPinMarked(
bit_idx * kBytesPerBit + pin_offset); bit_idx * kBytesPerBit + pin_offset);
} }
size_t expected_estimate_useful = total_bits * kBytesPerBit; size_t expected_estimate_useful = total_bits * kBytesPerBit;
size_t got_estimate_useful = size_t got_estimate_useful =
stats->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES); stats->getTickerCount(READ_AMP_ESTIMATE_USEFUL_BYTES);
ASSERT_EQ(expected_estimate_useful, got_estimate_useful); ASSERT_EQ(expected_estimate_useful, got_estimate_useful);
} }
} }

@ -9,8 +9,8 @@
#include "table/format.h" #include "table/format.h"
#include <string>
#include <inttypes.h> #include <inttypes.h>
#include <string>
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
@ -56,8 +56,7 @@ void BlockHandle::EncodeTo(std::string* dst) const {
} }
Status BlockHandle::DecodeFrom(Slice* input) { Status BlockHandle::DecodeFrom(Slice* input) {
if (GetVarint64(input, &offset_) && if (GetVarint64(input, &offset_) && GetVarint64(input, &size_)) {
GetVarint64(input, &size_)) {
return Status::OK(); return Status::OK();
} else { } else {
// reset in case failure after partially decoding // reset in case failure after partially decoding
@ -159,7 +158,7 @@ Status Footer::DecodeFrom(Slice* input) {
assert(input != nullptr); assert(input != nullptr);
assert(input->size() >= kMinEncodedLength); assert(input->size() >= kMinEncodedLength);
const char *magic_ptr = const char* magic_ptr =
input->data() + input->size() - kMagicNumberLengthByte; input->data() + input->size() - kMagicNumberLengthByte;
const uint32_t magic_lo = DecodeFixed32(magic_ptr); const uint32_t magic_lo = DecodeFixed32(magic_ptr);
const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4); const uint32_t magic_hi = DecodeFixed32(magic_ptr + 4);
@ -234,9 +233,10 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
uint64_t file_size, Footer* footer, uint64_t file_size, Footer* footer,
uint64_t enforce_table_magic_number) { uint64_t enforce_table_magic_number) {
if (file_size < Footer::kMinEncodedLength) { if (file_size < Footer::kMinEncodedLength) {
return Status::Corruption( return Status::Corruption("file is too short (" + ToString(file_size) +
"file is too short (" + ToString(file_size) + " bytes) to be an " " bytes) to be an "
"sstable: " + file->file_name()); "sstable: " +
file->file_name());
} }
char footer_space[Footer::kMaxEncodedLength]; char footer_space[Footer::kMaxEncodedLength];
@ -257,9 +257,10 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
// Check that we actually read the whole footer from the file. It may be // Check that we actually read the whole footer from the file. It may be
// that size isn't correct. // that size isn't correct.
if (footer_input.size() < Footer::kMinEncodedLength) { if (footer_input.size() < Footer::kMinEncodedLength) {
return Status::Corruption( return Status::Corruption("file is too short (" + ToString(file_size) +
"file is too short (" + ToString(file_size) + " bytes) to be an " " bytes) to be an "
"sstable" + file->file_name()); "sstable" +
file->file_name());
} }
s = footer->DecodeFrom(&footer_input); s = footer->DecodeFrom(&footer_input);
@ -269,10 +270,9 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
if (enforce_table_magic_number != 0 && if (enforce_table_magic_number != 0 &&
enforce_table_magic_number != footer->table_magic_number()) { enforce_table_magic_number != footer->table_magic_number()) {
return Status::Corruption( return Status::Corruption(
"Bad table magic number: expected " "Bad table magic number: expected " +
+ ToString(enforce_table_magic_number) + ", found " ToString(enforce_table_magic_number) + ", found " +
+ ToString(footer->table_magic_number()) ToString(footer->table_magic_number()) + " in " + file->file_name());
+ " in " + file->file_name());
} }
return Status::OK(); return Status::OK();
} }
@ -286,14 +286,14 @@ Status UncompressBlockContentsForCompressionType(
assert(uncompression_info.type() != kNoCompression && assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type"); "Invalid compression type");
StopWatchNano timer(ioptions.env, StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(
ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); ioptions.env, ioptions.statistics));
int decompress_size = 0; int decompress_size = 0;
switch (uncompression_info.type()) { switch (uncompression_info.type()) {
case kSnappyCompression: { case kSnappyCompression: {
size_t ulength = 0; size_t ulength = 0;
static char snappy_corrupt_msg[] = static char snappy_corrupt_msg[] =
"Snappy not supported or corrupted Snappy compressed block contents"; "Snappy not supported or corrupted Snappy compressed block contents";
if (!Snappy_GetUncompressedLength(data, n, &ulength)) { if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg); return Status::Corruption(snappy_corrupt_msg);
} }
@ -311,7 +311,7 @@ Status UncompressBlockContentsForCompressionType(
allocator); allocator);
if (!ubuf) { if (!ubuf) {
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents"; "Zlib not supported or corrupted Zlib compressed block contents";
return Status::Corruption(zlib_corrupt_msg); return Status::Corruption(zlib_corrupt_msg);
} }
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
@ -323,7 +323,7 @@ Status UncompressBlockContentsForCompressionType(
allocator); allocator);
if (!ubuf) { if (!ubuf) {
static char bzip2_corrupt_msg[] = static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents"; "Bzip2 not supported or corrupted Bzip2 compressed block contents";
return Status::Corruption(bzip2_corrupt_msg); return Status::Corruption(bzip2_corrupt_msg);
} }
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
@ -335,7 +335,7 @@ Status UncompressBlockContentsForCompressionType(
allocator); allocator);
if (!ubuf) { if (!ubuf) {
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents"; "LZ4 not supported or corrupted LZ4 compressed block contents";
return Status::Corruption(lz4_corrupt_msg); return Status::Corruption(lz4_corrupt_msg);
} }
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
@ -347,7 +347,7 @@ Status UncompressBlockContentsForCompressionType(
allocator); allocator);
if (!ubuf) { if (!ubuf) {
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents"; "LZ4HC not supported or corrupted LZ4HC compressed block contents";
return Status::Corruption(lz4hc_corrupt_msg); return Status::Corruption(lz4hc_corrupt_msg);
} }
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
@ -358,7 +358,8 @@ Status UncompressBlockContentsForCompressionType(
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) { if (!ubuf) {
static char xpress_corrupt_msg[] = static char xpress_corrupt_msg[] =
"XPRESS not supported or corrupted XPRESS compressed block contents"; "XPRESS not supported or corrupted XPRESS compressed block "
"contents";
return Status::Corruption(xpress_corrupt_msg); return Status::Corruption(xpress_corrupt_msg);
} }
*contents = BlockContents(std::move(ubuf), decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size);
@ -378,7 +379,7 @@ Status UncompressBlockContentsForCompressionType(
return Status::Corruption("bad block type"); return Status::Corruption("bad block type");
} }
if(ShouldReportDetailedTime(ioptions.env, ioptions.statistics)){ if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) {
RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS, RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos()); timer.ElapsedNanos());
} }

@ -76,8 +76,8 @@ class BlockHandle {
static const BlockHandle kNullBlockHandle; static const BlockHandle kNullBlockHandle;
}; };
inline uint32_t GetCompressFormatForVersion( inline uint32_t GetCompressFormatForVersion(CompressionType compression_type,
CompressionType compression_type, uint32_t version) { uint32_t version) {
#ifdef NDEBUG #ifdef NDEBUG
(void)compression_type; (void)compression_type;
#endif #endif
@ -195,7 +195,7 @@ inline CompressionType get_block_compression_type(const char* block_data,
} }
struct BlockContents { struct BlockContents {
Slice data; // Actual contents of data Slice data; // Actual contents of data
CacheAllocationPtr allocation; CacheAllocationPtr allocation;
#ifndef NDEBUG #ifndef NDEBUG

Loading…
Cancel
Save