Remove random writes from SST file ingestion (#4172)

Summary:
RocksDB used to store global_seqno in external SST files written by
SstFileWriter. During file ingestion, RocksDB uses `pwrite` to update the
`global_seqno`. Since random write is not supported in some non-POSIX compliant
file systems, external SST file ingestion is not supported on these file
systems. To address this limitation, we no longer update `global_seqno` during
file ingestion. Later RocksDB uses the MANIFEST and other information in table
properties to deduce global seqno for externally-ingested SST files.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4172

Differential Revision: D8961465

Pulled By: riversand963

fbshipit-source-id: 4382ec85270a96be5bc0cf33758ca2b167b05071
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent a11df583ec
commit 54de56844d
  1. 4
      db/compaction_picker_test.cc
  2. 18
      db/compaction_picker_universal.cc
  3. 4
      db/db_impl.cc
  4. 14
      db/db_impl_compaction_flush.cc
  5. 2
      db/db_impl_experimental.cc
  6. 2
      db/db_impl_open.cc
  7. 2
      db/db_range_del_test.cc
  8. 44
      db/external_sst_file_ingestion_job.cc
  9. 3
      db/external_sst_file_ingestion_job.h
  10. 2
      db/flush_job.cc
  11. 15
      db/flush_job_test.cc
  12. 2
      db/table_cache.cc
  13. 32
      db/version_builder.cc
  14. 4
      db/version_builder_test.cc
  15. 29
      db/version_edit.cc
  16. 35
      db/version_edit.h
  17. 24
      db/version_set.cc
  18. 12
      include/rocksdb/options.h
  19. 2
      table/block_based_table_factory.cc
  20. 80
      table/block_based_table_reader.cc
  21. 1
      table/block_based_table_reader.h
  22. 17
      table/table_builder.h
  23. 9
      table/table_test.cc
  24. 4
      tools/db_stress.cc

@ -90,8 +90,8 @@ class CompactionPickerTest : public testing::Test {
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
f->largest = InternalKey(largest, largest_seq, kTypeValue);
f->smallest_seqno = smallest_seq;
f->largest_seqno = largest_seq;
f->fd.smallest_seqno = smallest_seq;
f->fd.largest_seqno = largest_seq;
f->compensated_file_size = file_size;
f->refs = 0;
vstorage_->AddFile(level, f);

@ -97,17 +97,17 @@ void GetSmallestLargestSeqno(const std::vector<FileMetaData*>& files,
SequenceNumber* largest_seqno) {
bool is_first = true;
for (FileMetaData* f : files) {
assert(f->smallest_seqno <= f->largest_seqno);
assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
if (is_first) {
is_first = false;
*smallest_seqno = f->smallest_seqno;
*largest_seqno = f->largest_seqno;
*smallest_seqno = f->fd.smallest_seqno;
*largest_seqno = f->fd.largest_seqno;
} else {
if (f->smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->smallest_seqno;
if (f->fd.smallest_seqno < *smallest_seqno) {
*smallest_seqno = f->fd.smallest_seqno;
}
if (f->largest_seqno > *largest_seqno) {
*largest_seqno = f->largest_seqno;
if (f->fd.largest_seqno > *largest_seqno) {
*largest_seqno = f->fd.largest_seqno;
}
}
}
@ -365,11 +365,11 @@ Compaction* UniversalCompactionPicker::PickCompaction(
size_t level_index = 0U;
if (c->start_level() == 0) {
for (auto f : *c->inputs(0)) {
assert(f->smallest_seqno <= f->largest_seqno);
assert(f->fd.smallest_seqno <= f->fd.largest_seqno);
if (is_first) {
is_first = false;
}
prev_smallest_seqno = f->smallest_seqno;
prev_smallest_seqno = f->fd.smallest_seqno;
}
level_index = 1U;
}

@ -2932,8 +2932,8 @@ Status DBImpl::IngestExternalFile(
}
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
status = ingestion_job.Prepare(external_files, next_file_number,
super_version);
status =
ingestion_job.Prepare(external_files, next_file_number, super_version);
CleanupSuperVersion(super_version);
if (!status.ok()) {
InstrumentedMutexLock l(&mutex_);

@ -230,8 +230,8 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
@ -281,8 +281,8 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->smallest_seqno;
info.largest_seqno = file_meta->largest_seqno;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
@ -885,7 +885,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
@ -1804,8 +1804,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->smallest_seqno, f->largest_seqno,
f->marked_for_compaction);
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction);
ROCKS_LOG_BUFFER(
log_buffer,

@ -131,7 +131,7 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
edit.DeleteFile(0, f->fd.GetNumber());
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}

@ -969,7 +969,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
if (s.ok() && meta.fd.GetFileSize() > 0) {
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction);
}

@ -191,7 +191,7 @@ TEST_F(DBRangeDelTest, SentinelsOmittedFromOutputFile) {
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
ASSERT_GT(files[0][0].smallest_seqno, 0);
ASSERT_GT(files[0][0].fd.smallest_seqno, 0);
db_->ReleaseSnapshot(snapshot);
}

@ -30,8 +30,7 @@ namespace rocksdb {
Status ExternalSstFileIngestionJob::Prepare(
const std::vector<std::string>& external_files_paths,
uint64_t next_file_number,
SuperVersion* sv) {
uint64_t next_file_number, SuperVersion* sv) {
Status status;
// Read the information of files we are ingesting
@ -477,9 +476,9 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
const SequenceNumber level_largest_seqno =
(*max_element(level_files.begin(), level_files.end(),
[](FileMetaData* f1, FileMetaData* f2) {
return f1->largest_seqno < f2->largest_seqno;
return f1->fd.largest_seqno < f2->fd.largest_seqno;
}))
->largest_seqno;
->fd.largest_seqno;
// should only assign seqno to current level's largest seqno when
// the file fits
if (level_largest_seqno != 0 &&
@ -524,7 +523,7 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
// at some upper level
for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
for (auto file : vstorage->LevelFiles(lvl)) {
if (file->smallest_seqno == 0) {
if (file->fd.smallest_seqno == 0) {
return Status::InvalidArgument(
"Can't ingest_behind file as despite allow_ingest_behind=true "
"there are files with 0 seqno in database at upper levels!");
@ -549,24 +548,27 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
"field");
}
std::unique_ptr<RandomRWFile> rwfile;
Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
&rwfile, env_options_);
if (!status.ok()) {
return status;
if (ingestion_options_.write_global_seqno) {
// Determine if we can write global_seqno to a given offset of file.
// If the file system does not support random write, then we should not.
// Otherwise we should.
std::unique_ptr<RandomRWFile> rwfile;
Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
&rwfile, env_options_);
if (status.ok()) {
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
if (!status.ok()) {
return status;
}
} else if (!status.IsNotSupported()) {
return status;
}
}
// Write the new seqno in the global sequence number field in the file
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
if (status.ok()) {
status = rwfile->Fsync();
}
if (status.ok()) {
file_to_ingest->assigned_seqno = seqno;
}
return status;
file_to_ingest->assigned_seqno = seqno;
return Status::OK();
}
bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(

@ -89,8 +89,7 @@ class ExternalSstFileIngestionJob {
// Prepare the job by copying external files into the DB.
Status Prepare(const std::vector<std::string>& external_files_paths,
uint64_t next_file_number,
SuperVersion* sv);
uint64_t next_file_number, SuperVersion* sv);
// Check if we need to flush the memtable before running the ingestion job
// This will be true if the files we are ingesting are overlapping with any

@ -389,7 +389,7 @@ Status FlushJob::WriteLevel0Table() {
// Add file to L0
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.smallest_seqno, meta_.largest_seqno,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction);
}

@ -147,19 +147,20 @@ TEST_F(FlushJobTest, NonEmpty) {
db_options_.statistics.get(), &event_logger, true);
HistogramData hist;
FileMetaData fd;
FileMetaData file_meta;
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run(nullptr, &fd));
ASSERT_OK(flush_job.Run(nullptr, &file_meta));
mutex_.Unlock();
db_options_.statistics->histogramData(FLUSH_TIME, &hist);
ASSERT_GT(hist.average, 0.0);
ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString());
ASSERT_EQ("9999a",
fd.largest.user_key().ToString()); // range tombstone end key
ASSERT_EQ(1, fd.smallest_seqno);
ASSERT_EQ(10000, fd.largest_seqno); // range tombstone seqnum 10000
ASSERT_EQ(ToString(0), file_meta.smallest.user_key().ToString());
ASSERT_EQ(
"9999a",
file_meta.largest.user_key().ToString()); // range tombstone end key
ASSERT_EQ(1, file_meta.fd.smallest_seqno);
ASSERT_EQ(10000, file_meta.fd.largest_seqno); // range tombstone seqnum 10000
mock_table_factory_->AssertSingleFile(inserted_keys);
job_context.Clean();
}

@ -120,7 +120,7 @@ Status TableCache::GetTableReader(
s = ioptions_.table_factory->NewTableReader(
TableReaderOptions(ioptions_, prefix_extractor, env_options,
internal_comparator, skip_filters, immortal_tables_,
level),
level, fd.largest_seqno),
std::move(file_reader), fd.GetFileSize(), table_reader,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");

@ -35,11 +35,11 @@
namespace rocksdb {
bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->largest_seqno != b->largest_seqno) {
return a->largest_seqno > b->largest_seqno;
if (a->fd.largest_seqno != b->fd.largest_seqno) {
return a->fd.largest_seqno > b->fd.largest_seqno;
}
if (a->smallest_seqno != b->smallest_seqno) {
return a->smallest_seqno > b->smallest_seqno;
if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
return a->fd.smallest_seqno > b->fd.smallest_seqno;
}
// Break ties by file number
return a->fd.GetNumber() > b->fd.GetNumber();
@ -162,22 +162,24 @@ class VersionBuilder::Rep {
abort();
}
if (f2->smallest_seqno == f2->largest_seqno) {
if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
// This is an external file that we ingested
SequenceNumber external_file_seqno = f2->smallest_seqno;
if (!(external_file_seqno < f1->largest_seqno ||
SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
if (!(external_file_seqno < f1->fd.largest_seqno ||
external_file_seqno == 0)) {
fprintf(stderr, "L0 file with seqno %" PRIu64 " %" PRIu64
" vs. file with global_seqno %" PRIu64 "\n",
f1->smallest_seqno, f1->largest_seqno,
fprintf(stderr,
"L0 file with seqno %" PRIu64 " %" PRIu64
" vs. file with global_seqno %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
external_file_seqno);
abort();
}
} else if (f1->smallest_seqno <= f2->smallest_seqno) {
fprintf(stderr, "L0 files seqno %" PRIu64 " %" PRIu64
" vs. %" PRIu64 " %" PRIu64 "\n",
f1->smallest_seqno, f1->largest_seqno, f2->smallest_seqno,
f2->largest_seqno);
} else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
fprintf(stderr,
"L0 files seqno %" PRIu64 " %" PRIu64 " vs. %" PRIu64
" %" PRIu64 "\n",
f1->fd.smallest_seqno, f1->fd.largest_seqno,
f2->fd.smallest_seqno, f2->fd.largest_seqno);
abort();
}
} else {

@ -63,8 +63,8 @@ class VersionBuilderTest : public testing::Test {
f->fd = FileDescriptor(file_number, path_id, file_size);
f->smallest = GetInternalKey(smallest, smallest_seq);
f->largest = GetInternalKey(largest, largest_seq);
f->smallest_seqno = smallest_seqno;
f->largest_seqno = largest_seqno;
f->fd.smallest_seqno = smallest_seqno;
f->fd.largest_seqno = largest_seqno;
f->compensated_file_size = file_size;
f->refs = 0;
f->num_entries = num_entries;

@ -135,7 +135,7 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.fd.GetFileSize());
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutVarint64Varint64(dst, f.smallest_seqno, f.largest_seqno);
PutVarint64Varint64(dst, f.fd.smallest_seqno, f.fd.largest_seqno);
if (has_customized_fields) {
// Customized fields' format:
// +-----------------------------+
@ -233,14 +233,16 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
uint64_t number;
uint32_t path_id = 0;
uint64_t file_size;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
// Since this is the only forward-compatible part of the code, we hack new
// extension into this record. When we do, we set this boolean to distinguish
// the record from the normal NewFile records.
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
GetInternalKey(input, &f.largest) &&
GetVarint64(input, &f.smallest_seqno) &&
GetVarint64(input, &f.largest_seqno)) {
GetVarint64(input, &smallest_seqno) &&
GetVarint64(input, &largest_seqno)) {
// See comments in VersionEdit::EncodeTo() for format of customized fields
while (true) {
uint32_t custom_tag;
@ -289,7 +291,8 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
} else {
return "new-file4 entry";
}
f.fd = FileDescriptor(number, path_id, file_size);
f.fd =
FileDescriptor(number, path_id, file_size, smallest_seqno, largest_seqno);
new_files_.push_back(std::make_pair(level, f));
return nullptr;
}
@ -409,13 +412,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
case kNewFile2: {
uint64_t number;
uint64_t file_size;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest) &&
GetVarint64(&input, &f.smallest_seqno) &&
GetVarint64(&input, &f.largest_seqno)) {
f.fd = FileDescriptor(number, 0, file_size);
GetVarint64(&input, &smallest_seqno) &&
GetVarint64(&input, &largest_seqno)) {
f.fd = FileDescriptor(number, 0, file_size, smallest_seqno,
largest_seqno);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {
@ -429,13 +435,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
uint64_t number;
uint32_t path_id;
uint64_t file_size;
SequenceNumber smallest_seqno;
SequenceNumber largest_seqno;
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest) &&
GetVarint64(&input, &f.smallest_seqno) &&
GetVarint64(&input, &f.largest_seqno)) {
f.fd = FileDescriptor(number, path_id, file_size);
GetVarint64(&input, &smallest_seqno) &&
GetVarint64(&input, &largest_seqno)) {
f.fd = FileDescriptor(number, path_id, file_size, smallest_seqno,
largest_seqno);
new_files_.push_back(std::make_pair(level, f));
} else {
if (!msg) {

@ -36,18 +36,28 @@ struct FileDescriptor {
TableReader* table_reader;
uint64_t packed_number_and_path_id;
uint64_t file_size; // File size in bytes
SequenceNumber smallest_seqno; // The smallest seqno in this file
SequenceNumber largest_seqno; // The largest seqno in this file
FileDescriptor() : FileDescriptor(0, 0, 0) {}
FileDescriptor(uint64_t number, uint32_t path_id, uint64_t _file_size)
: FileDescriptor(number, path_id, _file_size, kMaxSequenceNumber, 0) {}
FileDescriptor(uint64_t number, uint32_t path_id, uint64_t _file_size,
SequenceNumber _smallest_seqno, SequenceNumber _largest_seqno)
: table_reader(nullptr),
packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)),
file_size(_file_size) {}
file_size(_file_size),
smallest_seqno(_smallest_seqno),
largest_seqno(_largest_seqno) {}
FileDescriptor& operator=(const FileDescriptor& fd) {
table_reader = fd.table_reader;
packed_number_and_path_id = fd.packed_number_and_path_id;
file_size = fd.file_size;
smallest_seqno = fd.smallest_seqno;
largest_seqno = fd.largest_seqno;
return *this;
}
@ -77,8 +87,6 @@ struct FileMetaData {
FileDescriptor fd;
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
SequenceNumber smallest_seqno; // The smallest seqno in this file
SequenceNumber largest_seqno; // The largest seqno in this file
// Needs to be disposed when refs becomes 0.
Cache::Handle* table_reader_handle;
@ -108,9 +116,7 @@ struct FileMetaData {
// file.
FileMetaData()
: smallest_seqno(kMaxSequenceNumber),
largest_seqno(0),
table_reader_handle(nullptr),
: table_reader_handle(nullptr),
compensated_file_size(0),
num_entries(0),
num_deletions(0),
@ -128,8 +134,8 @@ struct FileMetaData {
smallest.DecodeFrom(key);
}
largest.DecodeFrom(key);
smallest_seqno = std::min(smallest_seqno, seqno);
largest_seqno = std::max(largest_seqno, seqno);
fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
fd.largest_seqno = std::max(fd.largest_seqno, seqno);
}
// Unlike UpdateBoundaries, ranges do not need to be presented in any
@ -143,8 +149,8 @@ struct FileMetaData {
if (largest.size() == 0 || icmp.Compare(largest, end) < 0) {
largest = end;
}
smallest_seqno = std::min(smallest_seqno, seqno);
largest_seqno = std::max(largest_seqno, seqno);
fd.smallest_seqno = std::min(fd.smallest_seqno, seqno);
fd.largest_seqno = std::max(fd.largest_seqno, seqno);
}
};
@ -233,17 +239,18 @@ class VersionEdit {
bool marked_for_compaction) {
assert(smallest_seqno <= largest_seqno);
FileMetaData f;
f.fd = FileDescriptor(file, file_path_id, file_size);
f.fd = FileDescriptor(file, file_path_id, file_size, smallest_seqno,
largest_seqno);
f.smallest = smallest;
f.largest = largest;
f.smallest_seqno = smallest_seqno;
f.largest_seqno = largest_seqno;
f.fd.smallest_seqno = smallest_seqno;
f.fd.largest_seqno = largest_seqno;
f.marked_for_compaction = marked_for_compaction;
new_files_.emplace_back(level, std::move(f));
}
void AddFile(int level, const FileMetaData& f) {
assert(f.smallest_seqno <= f.largest_seqno);
assert(f.fd.smallest_seqno <= f.fd.largest_seqno);
new_files_.emplace_back(level, f);
}

@ -897,8 +897,8 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
}
files.emplace_back(
MakeTableFileName("", file->fd.GetNumber()), file_path,
file->fd.GetFileSize(), file->smallest_seqno, file->largest_seqno,
file->smallest.user_key().ToString(),
file->fd.GetFileSize(), file->fd.smallest_seqno,
file->fd.largest_seqno, file->smallest.user_key().ToString(),
file->largest.user_key().ToString(),
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
file->being_compacted);
@ -1891,13 +1891,15 @@ void VersionStorageInfo::UpdateFilesByCompactionPri(
case kOldestLargestSeqFirst:
std::sort(temp.begin(), temp.end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->largest_seqno < f2.file->largest_seqno;
return f1.file->fd.largest_seqno <
f2.file->fd.largest_seqno;
});
break;
case kOldestSmallestSeqFirst:
std::sort(temp.begin(), temp.end(),
[](const Fsize& f1, const Fsize& f2) -> bool {
return f1.file->smallest_seqno < f2.file->smallest_seqno;
return f1.file->fd.smallest_seqno <
f2.file->fd.smallest_seqno;
});
break;
case kMinOverlappingRatio:
@ -1981,17 +1983,17 @@ void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
bottommost_files_mark_threshold_ = kMaxSequenceNumber;
for (auto& level_and_file : bottommost_files_) {
if (!level_and_file.second->being_compacted &&
level_and_file.second->largest_seqno != 0 &&
level_and_file.second->fd.largest_seqno != 0 &&
level_and_file.second->num_deletions > 1) {
// largest_seqno might be nonzero due to containing the final key in an
// earlier compaction, whose seqnum we didn't zero out. Multiple deletions
// ensures the file really contains deleted or overwritten keys.
if (level_and_file.second->largest_seqno < oldest_snapshot_seqnum_) {
if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
bottommost_files_marked_for_compaction_.push_back(level_and_file);
} else {
bottommost_files_mark_threshold_ =
std::min(bottommost_files_mark_threshold_,
level_and_file.second->largest_seqno);
level_and_file.second->fd.largest_seqno);
}
}
}
@ -2417,7 +2419,7 @@ const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
int ret = snprintf(scratch->buffer + len, sz,
"#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
f->fd.GetNumber(), f->smallest_seqno, sztxt,
f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
static_cast<int>(f->being_compacted));
if (ret < 0 || ret >= sz)
break;
@ -3963,7 +3965,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
cfd->current()->storage_info()->LevelFiles(level)) {
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction);
}
}
@ -4288,8 +4290,8 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
filemetadata.size = file->fd.GetFileSize();
filemetadata.smallestkey = file->smallest.user_key().ToString();
filemetadata.largestkey = file->largest.user_key().ToString();
filemetadata.smallest_seqno = file->smallest_seqno;
filemetadata.largest_seqno = file->largest_seqno;
filemetadata.smallest_seqno = file->fd.smallest_seqno;
filemetadata.largest_seqno = file->fd.largest_seqno;
metadata->push_back(filemetadata);
}
}

@ -1267,6 +1267,18 @@ struct IngestExternalFileOptions {
// with allow_ingest_behind=true since the dawn of time.
// All files will be ingested at the bottommost level with seqno=0.
bool ingest_behind = false;
// Set to true if you would like to write global_seqno to a given offset in
// the external SST file for backward compatibility. Older versions of
// RocksDB writes a global_seqno to a given offset within ingested SST files,
// and new versions of RocksDB do not. If you ingest an external SST using
// new version of RocksDB and would like to be able to downgrade to an
// older version of RocksDB, you should set 'write_global_seqno' to true. If
// your service is just starting to use the new RocksDB, we recommend that
// you set this option to false, which brings two benefits:
// 1. No extra random write for global_seqno during ingestion.
// 2. Without writing external SST file, it's possible to do checksum.
// We have a plan to set this option to false by default in the future.
bool write_global_seqno = true;
};
} // namespace rocksdb

@ -203,7 +203,7 @@ Status BlockBasedTableFactory::NewTableReader(
file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal,
&tail_prefetch_stats_);
table_reader_options.largest_seqno, &tail_prefetch_stats_);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(

@ -9,6 +9,7 @@
#include "table/block_based_table_reader.h"
#include <algorithm>
#include <array>
#include <limits>
#include <string>
#include <utility>
@ -663,51 +664,71 @@ bool IsFeatureSupported(const TableProperties& table_properties,
return true;
}
SequenceNumber GetGlobalSequenceNumber(const TableProperties& table_properties,
Logger* info_log) {
auto& props = table_properties.user_collected_properties;
auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
// Caller has to ensure seqno is not nullptr.
Status GetGlobalSequenceNumber(const TableProperties& table_properties,
SequenceNumber largest_seqno,
SequenceNumber* seqno) {
const auto& props = table_properties.user_collected_properties;
const auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
const auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
*seqno = kDisableGlobalSequenceNumber;
if (version_pos == props.end()) {
if (seqno_pos != props.end()) {
std::array<char, 200> msg_buf;
// This is not an external sst file, global_seqno is not supported.
assert(false);
ROCKS_LOG_ERROR(
info_log,
snprintf(
msg_buf.data(), msg_buf.max_size(),
"A non-external sst file have global seqno property with value %s",
seqno_pos->second.c_str());
return Status::Corruption(msg_buf.data());
}
return kDisableGlobalSequenceNumber;
return Status::OK();
}
uint32_t version = DecodeFixed32(version_pos->second.c_str());
if (version < 2) {
if (seqno_pos != props.end() || version != 1) {
std::array<char, 200> msg_buf;
// This is a v1 external sst file, global_seqno is not supported.
assert(false);
ROCKS_LOG_ERROR(
info_log,
"An external sst file with version %u have global seqno property "
"with value %s",
version, seqno_pos->second.c_str());
snprintf(msg_buf.data(), msg_buf.max_size(),
"An external sst file with version %u have global seqno "
"property with value %s",
version, seqno_pos->second.c_str());
return Status::Corruption(msg_buf.data());
}
return kDisableGlobalSequenceNumber;
return Status::OK();
}
SequenceNumber global_seqno = DecodeFixed64(seqno_pos->second.c_str());
// Since we have a plan to deprecate global_seqno, we do not return failure
// if seqno_pos == props.end(). We rely on version_pos to detect whether the
// SST is external.
SequenceNumber global_seqno(0);
if (seqno_pos != props.end()) {
global_seqno = DecodeFixed64(seqno_pos->second.c_str());
}
if (global_seqno != 0 && global_seqno != largest_seqno) {
std::array<char, 200> msg_buf;
snprintf(msg_buf.data(), msg_buf.max_size(),
"An external sst file with version %u have global seqno property "
"with value %s, while largest seqno in the file is %llu",
version, seqno_pos->second.c_str(),
static_cast<unsigned long long>(largest_seqno));
return Status::Corruption(msg_buf.data());
}
global_seqno = largest_seqno;
*seqno = largest_seqno;
if (global_seqno > kMaxSequenceNumber) {
assert(false);
ROCKS_LOG_ERROR(
info_log,
"An external sst file with version %u have global seqno property "
"with value %llu, which is greater than kMaxSequenceNumber",
version, global_seqno);
std::array<char, 200> msg_buf;
snprintf(msg_buf.data(), msg_buf.max_size(),
"An external sst file with version %u have global seqno property "
"with value %llu, which is greater than kMaxSequenceNumber",
version, static_cast<unsigned long long>(global_seqno));
return Status::Corruption(msg_buf.data());
}
return global_seqno;
return Status::OK();
}
} // namespace
@ -734,6 +755,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
const bool prefetch_index_and_filter_in_cache,
const bool skip_filters, const int level,
const bool immortal_table,
const SequenceNumber largest_seqno,
TailPrefetchStats* tail_prefetch_stats) {
table_reader->reset();
@ -936,8 +958,12 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
*(rep->table_properties),
BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties),
rep->ioptions.info_log);
s = GetGlobalSequenceNumber(*(rep->table_properties), largest_seqno,
&(rep->global_seqno));
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str());
return s;
}
}
// Read the range del meta block

@ -95,6 +95,7 @@ class BlockBasedTable : public TableReader {
bool prefetch_index_and_filter_in_cache = true,
bool skip_filters = false, int level = -1,
const bool immortal_table = false,
const SequenceNumber largest_seqno = 0,
TailPrefetchStats* tail_prefetch_stats = nullptr);
bool PrefixMayMatch(const Slice& internal_key,

@ -13,6 +13,7 @@
#include <string>
#include <utility>
#include <vector>
#include "db/dbformat.h"
#include "db/table_properties_collector.h"
#include "options/cf_options.h"
#include "rocksdb/options.h"
@ -32,13 +33,25 @@ struct TableReaderOptions {
const InternalKeyComparator& _internal_comparator,
bool _skip_filters = false, bool _immortal = false,
int _level = -1)
: TableReaderOptions(_ioptions, _prefix_extractor, _env_options,
_internal_comparator, _skip_filters, _immortal,
_level, 0 /* _largest_seqno */) {}
// @param skip_filters Disables loading/accessing the filter block
TableReaderOptions(const ImmutableCFOptions& _ioptions,
const SliceTransform* _prefix_extractor,
const EnvOptions& _env_options,
const InternalKeyComparator& _internal_comparator,
bool _skip_filters, bool _immortal, int _level,
SequenceNumber _largest_seqno)
: ioptions(_ioptions),
prefix_extractor(_prefix_extractor),
env_options(_env_options),
internal_comparator(_internal_comparator),
skip_filters(_skip_filters),
immortal(_immortal),
level(_level) {}
level(_level),
largest_seqno(_largest_seqno) {}
const ImmutableCFOptions& ioptions;
const SliceTransform* prefix_extractor;
@ -50,6 +63,8 @@ struct TableReaderOptions {
bool immortal;
// what level this table/file is on, -1 for "not set, don't know"
int level;
// largest seqno in the table
SequenceNumber largest_seqno;
};
struct TableBuilderOptions {

@ -3132,7 +3132,14 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) {
// rocksdb still works.
}
TEST_P(BlockBasedTableTest, TableWithGlobalSeqno) {
/*
* Disable TableWithGlobalSeqno since RocksDB does not store global_seqno in
* the SST file any more. Instead, RocksDB deduces global_seqno from the
* MANIFEST while reading from an SST. Therefore, it's not possible to test the
* functionality of global_seqno in a single, isolated unit test without the
* involvement of Version, VersionSet, etc.
*/
TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
BlockBasedTableOptions bbto = GetBlockBasedTableOptions();
test::StringSink* sink = new test::StringSink();
unique_ptr<WritableFileWriter> file_writer(test::GetWritableFileWriter(sink));

@ -1634,8 +1634,8 @@ class StressTest {
snprintf(buf, 4, "%X", value[i]);
tmp.append(buf);
}
fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n",
cf, key, sz, tmp.c_str());
fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") %s\n", cf,
key, sz, tmp.c_str());
}
static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {

Loading…
Cancel
Save