Allow user to specify a CF for SST files generated by SstFileWriter

Summary:
Allow user to explicitly specify that the generated file by SstFileWriter will be ingested in a specific CF.
This allow us to persist the CF id in the generated file
Closes https://github.com/facebook/rocksdb/pull/1615

Differential Revision: D4270422

Pulled By: IslamAbdelRahman

fbshipit-source-id: 7fb954e
main
Islam AbdelRahman 8 years ago committed by Facebook Github Bot
parent 9053fe2a5c
commit 67f37cf198
  1. 11
      db/external_sst_file_ingestion_job.cc
  2. 2
      db/external_sst_file_ingestion_job.h
  3. 55
      db/external_sst_file_test.cc
  4. 7
      include/rocksdb/sst_file_writer.h
  5. 29
      table/sst_file_writer.cc

@ -38,6 +38,15 @@ Status ExternalSstFileIngestionJob::Prepare(
files_to_ingest_.push_back(file_to_ingest);
}
for (const IngestedFileInfo& f : files_to_ingest_) {
if (f.cf_id !=
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
f.cf_id != cfd_->GetID()) {
return Status::InvalidArgument(
"External file column family id dont match");
}
}
const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
auto num_files = files_to_ingest_.size();
if (num_files == 0) {
@ -325,6 +334,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
}
file_to_ingest->largest_user_key = key.user_key.ToString();
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
return status;
}

@ -36,6 +36,8 @@ struct IngestedFileInfo {
uint64_t file_size;
// total number of keys in external file
uint64_t num_entries;
// Id of column family this file shoule be ingested into
uint32_t cf_id;
// Version of external file
int version;

@ -1780,6 +1780,61 @@ TEST_F(ExternalSSTFileTest, DirtyExit) {
ASSERT_NOK(sst_file_writer->Finish());
}
TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko", "toto"}, options);
SstFileWriter sfw_default(EnvOptions(), options, options.comparator,
handles_[0]);
SstFileWriter sfw_cf1(EnvOptions(), options, options.comparator, handles_[1]);
SstFileWriter sfw_cf2(EnvOptions(), options, options.comparator, handles_[2]);
SstFileWriter sfw_unknown(EnvOptions(), options, options.comparator);
// default_cf.sst
const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst";
ASSERT_OK(sfw_default.Open(cf_default_sst));
ASSERT_OK(sfw_default.Add("K1", "V1"));
ASSERT_OK(sfw_default.Add("K2", "V2"));
ASSERT_OK(sfw_default.Finish());
// cf1.sst
const std::string cf1_sst = sst_files_dir_ + "/cf1.sst";
ASSERT_OK(sfw_cf1.Open(cf1_sst));
ASSERT_OK(sfw_cf1.Add("K3", "V1"));
ASSERT_OK(sfw_cf1.Add("K4", "V2"));
ASSERT_OK(sfw_cf1.Finish());
// cf_unknown.sst
const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst";
ASSERT_OK(sfw_unknown.Open(unknown_sst));
ASSERT_OK(sfw_unknown.Add("K5", "V1"));
ASSERT_OK(sfw_unknown.Add("K6", "V2"));
ASSERT_OK(sfw_unknown.Finish());
IngestExternalFileOptions ifo;
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[0], {cf1_sst}, ifo));
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf1_sst}, ifo));
// SST CF match
ASSERT_OK(db_->IngestExternalFile(handles_[1], {cf1_sst}, ifo));
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[1], {cf_default_sst}, ifo));
// SST CF dont match
ASSERT_NOK(db_->IngestExternalFile(handles_[2], {cf_default_sst}, ifo));
// SST CF match
ASSERT_OK(db_->IngestExternalFile(handles_[0], {cf_default_sst}, ifo));
// SST CF unknown
ASSERT_OK(db_->IngestExternalFile(handles_[1], {unknown_sst}, ifo));
// SST CF unknown
ASSERT_OK(db_->IngestExternalFile(handles_[2], {unknown_sst}, ifo));
// SST CF unknown
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -7,6 +7,7 @@
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
namespace rocksdb {
@ -43,8 +44,12 @@ struct ExternalSstFileInfo {
// All keys in files generated by SstFileWriter will have sequence number = 0
class SstFileWriter {
public:
// User can pass `column_family` to specify that the the generated file will
// be ingested into this column_family, note that passing nullptr means that
// the column_family is unknown.
SstFileWriter(const EnvOptions& env_options, const Options& options,
const Comparator* user_comparator);
const Comparator* user_comparator,
ColumnFamilyHandle* column_family = nullptr);
~SstFileWriter();

@ -21,11 +21,12 @@ const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const Options& options,
const Comparator* _user_comparator)
const Comparator* _user_comparator, ColumnFamilyHandle* _cfh)
: env_options(_env_options),
ioptions(options),
mutable_cf_options(options),
internal_comparator(_user_comparator) {}
internal_comparator(_user_comparator),
cfh(_cfh) {}
std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<TableBuilder> builder;
@ -34,14 +35,16 @@ struct SstFileWriter::Rep {
MutableCFOptions mutable_cf_options;
InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info;
std::string column_family_name;
InternalKey ikey;
std::string column_family_name;
ColumnFamilyHandle* cfh;
};
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
const Options& options,
const Comparator* user_comparator)
: rep_(new Rep(env_options, options, user_comparator)) {}
const Comparator* user_comparator,
ColumnFamilyHandle* column_family)
: rep_(new Rep(env_options, options, user_comparator, column_family)) {}
SstFileWriter::~SstFileWriter() {
if (rep_->builder) {
@ -89,6 +92,18 @@ Status SstFileWriter::Open(const std::string& file_path) {
user_collector_factories[i]));
}
int unknown_level = -1;
uint32_t cf_id;
if (r->cfh != nullptr) {
// user explicitly specified that this file will be ingested into cfh,
// we can persist this information in the file.
cf_id = r->cfh->GetID();
r->column_family_name = r->cfh->GetName();
} else {
r->column_family_name = "";
cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
}
TableBuilderOptions table_builder_options(
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
compression_type, r->ioptions.compression_opts,
@ -100,9 +115,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
table_builder_options,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
r->file_writer.get()));
table_builder_options, cf_id, r->file_writer.get()));
r->file_info.file_path = file_path;
r->file_info.file_size = 0;

Loading…
Cancel
Save