Improve accuracy of I/O stats collection of external SST ingestion.

Summary:
RocksDB supports ingestion of external ssts. If ingestion_options.move_files is true, when performing ingestion, RocksDB first tries to link external ssts. If external SST file resides on a different FS, or the underlying FS does not support hard link, then RocksDB performs actual file copy. However, no matter which choice is made, current code increase bytes-written when updating compaction stats, which is inaccurate when RocksDB does NOT copy file.

Rename a sync point.
Closes https://github.com/facebook/rocksdb/pull/3713

Differential Revision: D7604151

Pulled By: riversand963

fbshipit-source-id: dd0c0d9b9a69c7d9ffceafc3d9c23371aa413586
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 3be9b36453
commit c81b0abedd
  1. 18
      db/external_sst_file_ingestion_job.cc
  2. 6
      db/external_sst_file_ingestion_job.h
  3. 52
      db/external_sst_file_test.cc

@ -103,12 +103,16 @@ Status ExternalSstFileIngestionJob::Prepare(
// Original file is on a different FS, use copy instead of hard linking
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
f.copy_file = true;
} else {
f.copy_file = false;
}
} else {
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
f.copy_file = true;
}
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
if (!status.ok()) {
break;
}
@ -118,7 +122,7 @@ Status ExternalSstFileIngestionJob::Prepare(
if (!status.ok()) {
// We failed, remove all files that we copied into the db
for (IngestedFileInfo& f : files_to_ingest_) {
if (f.internal_file_path == "") {
if (f.internal_file_path.empty()) {
break;
}
Status s = env_->DeleteFile(f.internal_file_path);
@ -220,7 +224,15 @@ void ExternalSstFileIngestionJob::UpdateStats() {
for (IngestedFileInfo& f : files_to_ingest_) {
InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
stats.micros = total_time;
stats.bytes_written = f.fd.GetFileSize();
// If actual copy occured for this file, then we need to count the file
// size as the actual bytes written. If the file was linked, then we ignore
// the bytes written for file metadata.
// TODO (yanqin) maybe account for file metadata bytes for exact accuracy?
if (f.copy_file) {
stats.bytes_written = f.fd.GetFileSize();
} else {
stats.bytes_moved = f.fd.GetFileSize();
}
stats.num_output_files = 1;
cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,

@ -46,11 +46,15 @@ struct IngestedFileInfo {
// FileDescriptor for the file inside the DB
FileDescriptor fd;
// file path that we picked for file inside the DB
std::string internal_file_path = "";
std::string internal_file_path;
// Global sequence number that we picked for the file inside the DB
SequenceNumber assigned_seqno = 0;
// Level inside the DB we picked for the external file.
int picked_level = 0;
// Whether to copy or link the external sst file. copy_file will be set to
// false if ingestion_options.move_files is true and underlying FS
// supports link operation.
bool copy_file;
InternalKey smallest_internal_key() const {
return InternalKey(smallest_user_key, assigned_seqno,

@ -689,7 +689,7 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) {
DestroyAndReopen(options);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AddFile:FileCopied", [&](void* /*arg*/) {
"ExternalSstFileIngestionJob::Prepare:FileAdded", [&](void* /* arg */) {
ASSERT_OK(Put("aaa", "bbb"));
ASSERT_OK(Flush());
ASSERT_OK(Put("aaa", "xxx"));
@ -1801,6 +1801,56 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
}
/*
* Test and verify the functionality of ingestion_options.move_files.
*/
TEST_F(ExternalSSTFileTest, LinkExternalSst) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
DestroyAndReopen(options);
const int kNumKeys = 10000;
std::string file_path = sst_files_dir_ + "file1.sst";
// Create SstFileWriter for default column family
SstFileWriter sst_file_writer(EnvOptions(), options);
ASSERT_OK(sst_file_writer.Open(file_path));
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_value"));
}
ASSERT_OK(sst_file_writer.Finish());
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(file_path, &file_size));
IngestExternalFileOptions ifo;
ifo.move_files = true;
ASSERT_OK(db_->IngestExternalFile({file_path}, ifo));
ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
ColumnFamilyData* cfd = cfh->cfd();
const InternalStats* internal_stats_ptr = cfd->internal_stats();
const std::vector<InternalStats::CompactionStats>& comp_stats =
internal_stats_ptr->TEST_GetCompactionStats();
uint64_t bytes_copied = 0;
uint64_t bytes_moved = 0;
for (const auto& stats : comp_stats) {
bytes_copied += stats.bytes_written;
bytes_moved += stats.bytes_moved;
}
// If bytes_moved > 0, it means external sst resides on the same FS
// supporting hard link operation. Therefore,
// 0 bytes should be copied, and the bytes_moved == file_size.
// Otherwise, FS does not support hard link, or external sst file resides on
// a different file system, then the bytes_copied should be equal to
// file_size.
if (bytes_moved > 0) {
ASSERT_EQ(0, bytes_copied);
ASSERT_EQ(file_size, bytes_moved);
} else {
ASSERT_EQ(file_size, bytes_copied);
}
}
class TestIngestExternalFileListener : public EventListener {
public:
void OnExternalFileIngested(DB* /*db*/,

Loading…
Cancel
Save