From de7f423a82363b667e5bf5c714277135d2f79f39 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 9 Aug 2018 14:18:59 -0700 Subject: [PATCH] Add SST ingestion to ldb (#4205) Summary: We add two subcommands `write_extern_sst` and `ingest_extern_sst` to ldb. This PR avoids changing existing code because we hope to cherry-pick to earlier releases to support compatibility check for external SST file ingestion. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4205 Differential Revision: D9112711 Pulled By: riversand963 fbshipit-source-id: 7cae88380d4de86da8440230e87eca66755648e4 --- db/range_del_aggregator.cc | 3 +- include/rocksdb/utilities/ldb_cmd.h | 19 ++- tools/ldb_cmd.cc | 206 ++++++++++++++++++++++++++++ tools/ldb_cmd_impl.h | 53 +++++++ tools/ldb_test.py | 43 +++++- tools/ldb_tool.cc | 2 + 6 files changed, 313 insertions(+), 13 deletions(-) diff --git a/db/range_del_aggregator.cc b/db/range_del_aggregator.cc index c6b32d621..d816f3847 100644 --- a/db/range_del_aggregator.cc +++ b/db/range_del_aggregator.cc @@ -235,8 +235,7 @@ class CollapsedRangeDelMap : public RangeDelMap { } void AddTombstone(RangeTombstone t) { - if (ucmp_->Compare(t.start_key_, t.end_key_) >= 0 || - t.seq_ == 0) { + if (ucmp_->Compare(t.start_key_, t.end_key_) >= 0 || t.seq_ == 0) { // The tombstone covers no keys. Nothing to do. return; } diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index 4d91f0a66..907c9daf2 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -210,12 +210,20 @@ class LDBCommand { bool ParseStringOption(const std::map& options, const std::string& option, std::string* value); + /** + * Returns the value of the specified option as a boolean. + * default_val is used if the option is not found in options. + * Throws an exception if the value of the option is not + * "true" or "false" (case insensitive). + */ + bool ParseBooleanOption(const std::map& options, + const std::string& option, bool default_val); + Options options_; std::vector column_families_; LDBOptions ldb_options_; private: - friend class WALDumperCommand; /** * Interpret command line options and flags to determine if the key * should be input/output in hex. @@ -230,15 +238,6 @@ class LDBCommand { bool IsValueHex(const std::map& options, const std::vector& flags); - /** - * Returns the value of the specified option as a boolean. - * default_val is used if the option is not found in options. - * Throws an exception if the value of the option is not - * "true" or "false" (case insensitive). - */ - bool ParseBooleanOption(const std::map& options, - const std::string& option, bool default_val); - /** * Converts val to a boolean. * val must be either true or false (case insensitive). diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index ffcda844c..4492c6313 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -242,6 +242,14 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == RestoreCommand::Name()) { return new RestoreCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); + } else if (parsed_params.cmd == WriteExternalSstFilesCommand::Name()) { + return new WriteExternalSstFilesCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); + } else if (parsed_params.cmd == IngestExternalSstFilesCommand::Name()) { + return new IngestExternalSstFilesCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); } return nullptr; } @@ -2939,5 +2947,203 @@ void DBFileDumperCommand::DoCommand() { } } +void WriteExternalSstFilesCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(WriteExternalSstFilesCommand::Name()); + ret.append(" "); + ret.append("\n"); +} + +WriteExternalSstFilesCommand::WriteExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand( + options, flags, false /* is_read_only */, + BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM, + ARG_TO, ARG_CREATE_IF_MISSING})) { + create_if_missing_ = + IsFlagPresent(flags, ARG_CREATE_IF_MISSING) || + ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false); + if (params.size() != 1) { + exec_state_ = LDBCommandExecuteResult::Failed( + "output SST file path must be specified"); + } else { + output_sst_path_ = params.at(0); + } +} + +void WriteExternalSstFilesCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + ColumnFamilyHandle* cfh = GetCfHandle(); + SstFileWriter sst_file_writer(EnvOptions(), db_->GetOptions(), cfh); + Status status = sst_file_writer.Open(output_sst_path_); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed("failed to open SST file: " + + status.ToString()); + return; + } + + int bad_lines = 0; + std::string line; + std::ifstream ifs_stdin("/dev/stdin"); + std::istream* istream_p = ifs_stdin.is_open() ? &ifs_stdin : &std::cin; + while (getline(*istream_p, line, '\n')) { + std::string key; + std::string value; + if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) { + status = sst_file_writer.Put(key, value); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "failed to write record to file: " + status.ToString()); + return; + } + } else if (0 == line.find("Keys in range:")) { + // ignore this line + } else if (0 == line.find("Created bg thread 0x")) { + // ignore this line + } else { + bad_lines++; + } + } + + status = sst_file_writer.Finish(); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "Failed to finish writing to file: " + status.ToString()); + return; + } + + if (bad_lines > 0) { + fprintf(stderr, "Warning: %d bad lines ignored.\n", bad_lines); + } + exec_state_ = LDBCommandExecuteResult::Succeed( + "external SST file written to " + output_sst_path_); +} + +Options WriteExternalSstFilesCommand::PrepareOptionsForOpenDB() { + Options opt = LDBCommand::PrepareOptionsForOpenDB(); + opt.create_if_missing = create_if_missing_; + return opt; +} + +const std::string IngestExternalSstFilesCommand::ARG_MOVE_FILES = "move_files"; +const std::string IngestExternalSstFilesCommand::ARG_SNAPSHOT_CONSISTENCY = + "snapshot_consistency"; +const std::string IngestExternalSstFilesCommand::ARG_ALLOW_GLOBAL_SEQNO = + "allow_global_seqno"; +const std::string IngestExternalSstFilesCommand::ARG_ALLOW_BLOCKING_FLUSH = + "allow_blocking_flush"; +const std::string IngestExternalSstFilesCommand::ARG_INGEST_BEHIND = + "ingest_behind"; +const std::string IngestExternalSstFilesCommand::ARG_WRITE_GLOBAL_SEQNO = + "write_global_seqno"; + +void IngestExternalSstFilesCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(IngestExternalSstFilesCommand::Name()); + ret.append(" "); + ret.append(" [--" + ARG_MOVE_FILES + "] "); + ret.append(" [--" + ARG_SNAPSHOT_CONSISTENCY + "] "); + ret.append(" [--" + ARG_ALLOW_GLOBAL_SEQNO + "] "); + ret.append(" [--" + ARG_ALLOW_BLOCKING_FLUSH + "] "); + ret.append(" [--" + ARG_INGEST_BEHIND + "] "); + ret.append(" [--" + ARG_WRITE_GLOBAL_SEQNO + "] "); + ret.append("\n"); +} + +IngestExternalSstFilesCommand::IngestExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand( + options, flags, false /* is_read_only */, + BuildCmdLineOptions({ARG_MOVE_FILES, ARG_SNAPSHOT_CONSISTENCY, + ARG_ALLOW_GLOBAL_SEQNO, ARG_CREATE_IF_MISSING, + ARG_ALLOW_BLOCKING_FLUSH, ARG_INGEST_BEHIND, + ARG_WRITE_GLOBAL_SEQNO})), + move_files_(false), + snapshot_consistency_(true), + allow_global_seqno_(true), + allow_blocking_flush_(true), + ingest_behind_(false), + write_global_seqno_(true) { + create_if_missing_ = + IsFlagPresent(flags, ARG_CREATE_IF_MISSING) || + ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false); + move_files_ = IsFlagPresent(flags, ARG_MOVE_FILES) || + ParseBooleanOption(options, ARG_MOVE_FILES, false); + snapshot_consistency_ = + IsFlagPresent(flags, ARG_SNAPSHOT_CONSISTENCY) || + ParseBooleanOption(options, ARG_SNAPSHOT_CONSISTENCY, true); + allow_global_seqno_ = + IsFlagPresent(flags, ARG_ALLOW_GLOBAL_SEQNO) || + ParseBooleanOption(options, ARG_ALLOW_GLOBAL_SEQNO, true); + allow_blocking_flush_ = + IsFlagPresent(flags, ARG_ALLOW_BLOCKING_FLUSH) || + ParseBooleanOption(options, ARG_ALLOW_BLOCKING_FLUSH, true); + ingest_behind_ = IsFlagPresent(flags, ARG_INGEST_BEHIND) || + ParseBooleanOption(options, ARG_INGEST_BEHIND, false); + write_global_seqno_ = + IsFlagPresent(flags, ARG_WRITE_GLOBAL_SEQNO) || + ParseBooleanOption(options, ARG_WRITE_GLOBAL_SEQNO, true); + + if (allow_global_seqno_) { + if (!write_global_seqno_) { + fprintf(stderr, + "Warning: not writing global_seqno to the ingested SST can\n" + "prevent older versions of RocksDB from being able to open it\n"); + } + } else { + if (write_global_seqno_) { + exec_state_ = LDBCommandExecuteResult::Failed( + "ldb cannot write global_seqno to the ingested SST when global_seqno " + "is not allowed"); + } + } + + if (params.size() != 1) { + exec_state_ = + LDBCommandExecuteResult::Failed("input SST path must be specified"); + } else { + input_sst_path_ = params.at(0); + } +} + +void IngestExternalSstFilesCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + if (GetExecuteState().IsFailed()) { + return; + } + ColumnFamilyHandle* cfh = GetCfHandle(); + IngestExternalFileOptions ifo; + ifo.move_files = move_files_; + ifo.snapshot_consistency = snapshot_consistency_; + ifo.allow_global_seqno = allow_global_seqno_; + ifo.allow_blocking_flush = allow_blocking_flush_; + ifo.ingest_behind = ingest_behind_; + ifo.write_global_seqno = write_global_seqno_; + Status status = db_->IngestExternalFile(cfh, {input_sst_path_}, ifo); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "failed to ingest external SST: " + status.ToString()); + } else { + exec_state_ = + LDBCommandExecuteResult::Succeed("external SST files ingested"); + } +} + +Options IngestExternalSstFilesCommand::PrepareOptionsForOpenDB() { + Options opt = LDBCommand::PrepareOptionsForOpenDB(); + opt.create_if_missing = create_if_missing_; + return opt; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index e49e851b3..868c81f44 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -522,4 +522,57 @@ class RestoreCommand : public BackupableCommand { static void Help(std::string& ret); }; +class WriteExternalSstFilesCommand : public LDBCommand { + public: + static std::string Name() { return "write_extern_sst"; } + WriteExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags); + + virtual void DoCommand() override; + + virtual bool NoDBOpen() override { return false; } + + virtual Options PrepareOptionsForOpenDB() override; + + static void Help(std::string& ret); + + private: + std::string output_sst_path_; +}; + +class IngestExternalSstFilesCommand : public LDBCommand { + public: + static std::string Name() { return "ingest_extern_sst"; } + IngestExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags); + + virtual void DoCommand() override; + + virtual bool NoDBOpen() override { return false; } + + virtual Options PrepareOptionsForOpenDB() override; + + static void Help(std::string& ret); + + private: + std::string input_sst_path_; + bool move_files_; + bool snapshot_consistency_; + bool allow_global_seqno_; + bool allow_blocking_flush_; + bool ingest_behind_; + bool write_global_seqno_; + + static const std::string ARG_MOVE_FILES; + static const std::string ARG_SNAPSHOT_CONSISTENCY; + static const std::string ARG_ALLOW_GLOBAL_SEQNO; + static const std::string ARG_ALLOW_BLOCKING_FLUSH; + static const std::string ARG_INGEST_BEHIND; + static const std::string ARG_WRITE_GLOBAL_SEQNO; +}; + } // namespace rocksdb diff --git a/tools/ldb_test.py b/tools/ldb_test.py index fa0ded438..2200fb464 100644 --- a/tools/ldb_test.py +++ b/tools/ldb_test.py @@ -76,7 +76,7 @@ class LDBTestCase(unittest.TestCase): my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \ thread\"" % params, shell=True) - except Exception, e: + except Exception: return self.fail( "Exception should have been raised for command with params: %s" % @@ -146,6 +146,14 @@ class LDBTestCase(unittest.TestCase): def loadDb(self, params, dumpFile): return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params)) + def writeExternSst(self, params, inputDumpFile, outputSst): + return 0 == run_err_null("cat %s | ./ldb write_extern_sst %s %s" + % (inputDumpFile, outputSst, params)) + + def ingestExternSst(self, params, inputSst): + return 0 == run_err_null("./ldb ingest_extern_sst %s %s" + % (inputSst, params)) + def testStringBatchPut(self): print "Running testStringBatchPut..." self.assertRunOK("batchput x1 y1 --create_if_missing", "OK") @@ -547,5 +555,38 @@ class LDBTestCase(unittest.TestCase): # non-existing column family. self.assertRunFAIL("get cf3_1 --column_family=four") + def testIngestExternalSst(self): + print "Running testIngestExternalSst..." + + # Dump, load, write external sst and ingest it in another db + dbPath = os.path.join(self.TMP_DIR, "db1") + self.assertRunOK( + "batchput --db=%s --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4" + % dbPath, + "OK") + self.assertRunOK("scan --db=%s" % dbPath, + "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4") + dumpFilePath = os.path.join(self.TMP_DIR, "dump1") + with open(dumpFilePath, 'w') as f: + f.write("x1 ==> y10\nx2 ==> y20\nx3 ==> y30\nx4 ==> y40") + externSstPath = os.path.join(self.TMP_DIR, "extern_data1.sst") + self.assertTrue(self.writeExternSst("--create_if_missing --db=%s" + % dbPath, + dumpFilePath, + externSstPath)) + # cannot ingest if allow_global_seqno is false + self.assertFalse( + self.ingestExternSst( + "--create_if_missing --allow_global_seqno=false --db=%s" + % dbPath, + externSstPath)) + self.assertTrue( + self.ingestExternSst( + "--create_if_missing --allow_global_seqno --db=%s" + % dbPath, + externSstPath)) + self.assertRunOKFull("scan --db=%s" % dbPath, + "x1 : y10\nx2 : y20\nx3 : y30\nx4 : y40") + if __name__ == "__main__": unittest.main() diff --git a/tools/ldb_tool.cc b/tools/ldb_tool.cc index b09076ecc..fe307eab7 100644 --- a/tools/ldb_tool.cc +++ b/tools/ldb_tool.cc @@ -88,6 +88,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options, BackupCommand::Help(ret); RestoreCommand::Help(ret); CheckPointCommand::Help(ret); + WriteExternalSstFilesCommand::Help(ret); + IngestExternalSstFilesCommand::Help(ret); fprintf(stderr, "%s\n", ret.c_str()); }