Fix data race #3

Summary: Added requirement that ComputeCompactionScore() be executed in mutex, since it's accessing being_compacted bool, which can be mutated by other threads. Also added more comments about thread safety of FileMetaData, since it was a bit confusing. However, it seems that FileMetaData doesn't have data races (except being_compacted)

Test Plan: Ran 100 ConvertCompactionStyle tests with thread sanitizer. On master -- some failures. With this patch -- none.

Reviewers: yhchiang, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D32283
main
Igor Canadi 10 years ago
parent e63140d52b
commit e39f4f6cf9
  1. 21
      db/compaction_picker.cc
  2. 7
      db/compaction_picker.h
  3. 2
      db/compaction_picker_test.cc
  4. 2
      db/version_builder_test.cc
  5. 4
      db/version_edit.h
  6. 47
      db/version_set.cc
  7. 13
      db/version_set.h

@ -23,6 +23,7 @@
namespace rocksdb { namespace rocksdb {
namespace {
uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) { uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
uint64_t sum = 0; uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) { for (size_t i = 0; i < files.size() && files[i]; i++) {
@ -31,7 +32,6 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
return sum; return sum;
} }
namespace {
// Determine compression type, based on user options, level of the output // Determine compression type, based on user options, level of the output
// file and whether compression is disabled. // file and whether compression is disabled.
// If enable_compression is false, then compression is always disabled no // If enable_compression is false, then compression is always disabled no
@ -71,19 +71,6 @@ CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
CompactionPicker::~CompactionPicker() {} CompactionPicker::~CompactionPicker() {}
void CompactionPicker::SizeBeingCompacted(std::vector<uint64_t>& sizes) {
for (int level = 0; level < NumberLevels() - 1; level++) {
uint64_t total = 0;
for (auto c : compactions_in_progress_[level]) {
assert(c->level() == level);
for (size_t i = 0; i < c->num_input_files(0); i++) {
total += c->input(0, i)->compensated_file_size;
}
}
sizes[level] = total;
}
}
// Clear all files to indicate that they are not being compacted // Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions. // Delete this compaction from the list of running compactions.
void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
@ -763,13 +750,9 @@ Compaction* LevelCompactionPicker::PickCompaction(
// being compacted). Since we just changed compaction score, we recalculate it // being compacted). Since we just changed compaction score, we recalculate it
// here // here
{ // this piece of code recomputes compaction score { // this piece of code recomputes compaction score
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
CompactionOptionsFIFO dummy_compaction_options_fifo; CompactionOptionsFIFO dummy_compaction_options_fifo;
vstorage->ComputeCompactionScore(mutable_cf_options, vstorage->ComputeCompactionScore(mutable_cf_options,
dummy_compaction_options_fifo, dummy_compaction_options_fifo);
size_being_compacted);
} }
return c; return c;

@ -91,10 +91,6 @@ class CompactionPicker {
// Free up the files that participated in a compaction // Free up the files that participated in a compaction
void ReleaseCompactionFiles(Compaction* c, Status status); void ReleaseCompactionFiles(Compaction* c, Status status);
// Return the total amount of data that is undergoing
// compactions per level
void SizeBeingCompacted(std::vector<uint64_t>& sizes);
// Returns true if any one of the specified files are being compacted // Returns true if any one of the specified files are being compacted
bool FilesInCompaction(const std::vector<FileMetaData*>& files); bool FilesInCompaction(const std::vector<FileMetaData*>& files);
@ -314,7 +310,4 @@ class NullCompactionPicker : public CompactionPicker {
}; };
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
// Utility function
extern uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files);
} // namespace rocksdb } // namespace rocksdb

@ -83,8 +83,6 @@ class CompactionPickerTest {
} }
void UpdateVersionStorageInfo() { void UpdateVersionStorageInfo() {
vstorage_->ComputeCompactionScore(mutable_cf_options_, fifo_options_,
size_being_compacted_);
vstorage_->UpdateFilesBySize(); vstorage_->UpdateFilesBySize();
vstorage_->UpdateNumNonEmptyLevels(); vstorage_->UpdateNumNonEmptyLevels();
vstorage_->GenerateFileIndexer(); vstorage_->GenerateFileIndexer();

@ -74,8 +74,6 @@ class VersionBuilderTest {
} }
void UpdateVersionStorageInfo() { void UpdateVersionStorageInfo() {
vstorage_.ComputeCompactionScore(mutable_cf_options_, fifo_options_,
size_being_compacted_);
vstorage_.UpdateFilesBySize(); vstorage_.UpdateFilesBySize();
vstorage_.UpdateNumNonEmptyLevels(); vstorage_.UpdateNumNonEmptyLevels();
vstorage_.GenerateFileIndexer(); vstorage_.GenerateFileIndexer();

@ -76,8 +76,10 @@ struct FileMetaData {
// File size compensated by deletion entry. // File size compensated by deletion entry.
// This is updated in Version::UpdateAccumulatedStats() first time when the // This is updated in Version::UpdateAccumulatedStats() first time when the
// file is created or loaded. After it is updated, it is immutable. // file is created or loaded. After it is updated (!= 0), it is immutable.
uint64_t compensated_file_size; uint64_t compensated_file_size;
// These values can mutate, but they can only be read or written from
// single-threaded LogAndApply thread
uint64_t num_entries; // the number of entries. uint64_t num_entries; // the number of entries.
uint64_t num_deletions; // the number of deletion entries. uint64_t num_deletions; // the number of deletion entries.
uint64_t raw_key_size; // total uncompressed key size. uint64_t raw_key_size; // total uncompressed key size.

@ -850,12 +850,8 @@ void VersionStorageInfo::GenerateLevelFilesBrief() {
} }
} }
void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, void Version::PrepareApply() {
std::vector<uint64_t>& size_being_compacted) {
UpdateAccumulatedStats(); UpdateAccumulatedStats();
storage_info_.ComputeCompactionScore(
mutable_cf_options, cfd_->ioptions()->compaction_options_fifo,
size_being_compacted);
storage_info_.UpdateFilesBySize(); storage_info_.UpdateFilesBySize();
storage_info_.UpdateNumNonEmptyLevels(); storage_info_.UpdateNumNonEmptyLevels();
storage_info_.GenerateFileIndexer(); storage_info_.GenerateFileIndexer();
@ -947,7 +943,9 @@ void VersionStorageInfo::ComputeCompensatedSizes() {
for (int level = 0; level < num_levels_; level++) { for (int level = 0; level < num_levels_; level++) {
for (auto* file_meta : files_[level]) { for (auto* file_meta : files_[level]) {
// Here we only compute compensated_file_size for those file_meta // Here we only compute compensated_file_size for those file_meta
// which compensated_file_size is uninitialized (== 0). // which compensated_file_size is uninitialized (== 0). This is true only
// for files that have been created right now and no other thread has
// access to them. That's why we can safely mutate compensated_file_size.
if (file_meta->compensated_file_size == 0) { if (file_meta->compensated_file_size == 0) {
file_meta->compensated_file_size = file_meta->fd.GetFileSize() + file_meta->compensated_file_size = file_meta->fd.GetFileSize() +
file_meta->num_deletions * average_value_size * file_meta->num_deletions * average_value_size *
@ -966,8 +964,7 @@ int VersionStorageInfo::MaxInputLevel() const {
void VersionStorageInfo::ComputeCompactionScore( void VersionStorageInfo::ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
const CompactionOptionsFIFO& compaction_options_fifo, const CompactionOptionsFIFO& compaction_options_fifo) {
std::vector<uint64_t>& size_being_compacted) {
double max_score = 0; double max_score = 0;
int max_score_level = 0; int max_score_level = 0;
@ -1008,9 +1005,13 @@ void VersionStorageInfo::ComputeCompactionScore(
} }
} else { } else {
// Compute the ratio of current size to size limit. // Compute the ratio of current size to size limit.
const uint64_t level_bytes = uint64_t level_bytes_no_compacting = 0;
TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; for (auto f : files_[level]) {
score = static_cast<double>(level_bytes) / if (f && f->being_compacted == false) {
level_bytes_no_compacting += f->compensated_file_size;
}
}
score = static_cast<double>(level_bytes_no_compacting) /
mutable_cf_options.MaxBytesForLevel(level); mutable_cf_options.MaxBytesForLevel(level);
if (max_score < score) { if (max_score < score) {
max_score = score; max_score = score;
@ -1527,6 +1528,11 @@ VersionSet::~VersionSet() {
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Version* v) { Version* v) {
// compute new compaction score
v->storage_info()->ComputeCompactionScore(
*column_family_data->GetLatestMutableCFOptions(),
column_family_data->ioptions()->compaction_options_fifo);
// Mark v finalized // Mark v finalized
v->storage_info_.SetFinalized(); v->storage_info_.SetFinalized();
@ -1637,13 +1643,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Unlock during expensive operations. New writes cannot get here // Unlock during expensive operations. New writes cannot get here
// because &w is ensuring that all new writes get queued. // because &w is ensuring that all new writes get queued.
{ {
std::vector<uint64_t> size_being_compacted;
if (!edit->IsColumnFamilyManipulation()) {
size_being_compacted.resize(v->storage_info()->num_levels() - 1);
// calculate the amount of data being compacted at every level
column_family_data->compaction_picker()->SizeBeingCompacted(
size_being_compacted);
}
mu->Unlock(); mu->Unlock();
@ -1674,7 +1673,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!edit->IsColumnFamilyManipulation()) { if (!edit->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex. // This is cpu-heavy operations, which should be called outside mutex.
v->PrepareApply(mutable_cf_options, size_being_compacted); v->PrepareApply();
} }
// Write new record to MANIFEST log // Write new record to MANIFEST log
@ -2097,10 +2096,7 @@ Status VersionSet::Recover(
builder->SaveTo(v->storage_info()); builder->SaveTo(v->storage_info());
// Install recovered version // Install recovered version
std::vector<uint64_t> size_being_compacted( v->PrepareApply();
v->storage_info()->num_levels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
AppendVersion(cfd, v); AppendVersion(cfd, v);
} }
@ -2434,10 +2430,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
Version* v = new Version(cfd, this, current_version_number_++); Version* v = new Version(cfd, this, current_version_number_++);
builder->SaveTo(v->storage_info()); builder->SaveTo(v->storage_info());
std::vector<uint64_t> size_being_compacted( v->PrepareApply();
v->storage_info()->num_levels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
printf("--------------- Column family \"%s\" (ID %u) --------------\n", printf("--------------- Column family \"%s\" (ID %u) --------------\n",
cfd->GetName().c_str(), (unsigned int)cfd->GetID()); cfd->GetName().c_str(), (unsigned int)cfd->GetID());

@ -113,13 +113,11 @@ class VersionStorageInfo {
// Updates internal structures that keep track of compaction scores // Updates internal structures that keep track of compaction scores
// We use compaction scores to figure out which compaction to do next // We use compaction scores to figure out which compaction to do next
// REQUIRES: If Version is not yet saved to current_, it can be called without // REQUIRES: db_mutex held!!
// a lock. Once a version is saved to current_, call only with mutex held
// TODO find a better way to pass compaction_options_fifo. // TODO find a better way to pass compaction_options_fifo.
void ComputeCompactionScore( void ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options, const MutableCFOptions& mutable_cf_options,
const CompactionOptionsFIFO& compaction_options_fifo, const CompactionOptionsFIFO& compaction_options_fifo);
std::vector<uint64_t>& size_being_compacted);
// Generate level_files_brief_ from files_ // Generate level_files_brief_ from files_
void GenerateLevelFilesBrief(); void GenerateLevelFilesBrief();
@ -365,10 +363,9 @@ class Version {
Status* status, MergeContext* merge_context, Status* status, MergeContext* merge_context,
bool* value_found = nullptr); bool* value_found = nullptr);
// Update scores, pre-calculated variables. It needs to be called before // Loads some stats information from files. Call without mutex held. It needs
// applying the version to the version set. // to be called before applying the version to the version set.
void PrepareApply(const MutableCFOptions& mutable_cf_options, void PrepareApply();
std::vector<uint64_t>& size_being_compacted);
// Reference count management (so Versions do not disappear out from // Reference count management (so Versions do not disappear out from
// under live iterators) // under live iterators)

Loading…
Cancel
Save