Add experimental DB::AddFile() to plug sst files into empty DB

Summary:
This is an initial version of bulk load feature

This diff allow us to create sst files, and then bulk load them later, right now the restrictions for loading an sst file are
(1) Memtables are empty
(2) Added sst files have sequence number = 0, and existing values in database have sequence number = 0
(3) Added sst files values are not overlapping

Test Plan: unit testing

Reviewers: igor, ott, sdong

Reviewed By: sdong

Subscribers: leveldb, ott, dhruba

Differential Revision: https://reviews.facebook.net/D39081
main
Islam AbdelRahman 9 years ago
parent 3fdb6e5234
commit f03b5c987b
  1. 3
      HISTORY.md
  2. 216
      db/db_impl.cc
  3. 8
      db/db_impl.h
  4. 325
      db/db_test.cc
  5. 2
      db/table_properties_collector.h
  6. 31
      include/rocksdb/db.h
  7. 77
      include/rocksdb/sst_file_writer.h
  8. 6
      include/rocksdb/table.h
  9. 12
      include/rocksdb/utilities/stackable_db.h
  10. 1
      src.mk
  11. 188
      table/sst_file_writer.cc

@ -3,9 +3,12 @@
## Unreleased
### New Features
* Added single delete operation as a more efficient way to delete keys that have not been overwritten.
* Added experimental AddFile() to DB interface that allow users to add files created by SstFileWriter into an empty Database, see include/rocksdb/sst_file_writer.h and DB::AddFile() for more info.
### Public API Changes
* Added SingleDelete() to the DB interface.
* Added AddFile() to DB interface.
* Added SstFileWriter class.
## 4.0.0 (9/9/2015)
### New Features

@ -58,6 +58,7 @@
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/sst_file_writer.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
@ -3071,6 +3072,221 @@ std::vector<Status> DBImpl::MultiGet(
return stat_list;
}
#ifndef ROCKSDB_LITE
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
const std::string& file_path, bool move_file) {
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
ExternalSstFileInfo file_info;
file_info.file_path = file_path;
status = env_->GetFileSize(file_path, &file_info.file_size);
if (!status.ok()) {
return status;
}
// Access the file using TableReader to extract
// version, number of entries, smallest user key, largest user key
std::unique_ptr<RandomAccessFile> sst_file;
status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_);
if (!status.ok()) {
return status;
}
std::unique_ptr<RandomAccessFileReader> sst_file_reader;
sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file)));
std::unique_ptr<TableReader> table_reader;
status = cfd->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd->ioptions(), env_options_,
cfd->internal_comparator()),
std::move(sst_file_reader), file_info.file_size, &table_reader);
if (!status.ok()) {
return status;
}
// Get the external sst file version from table properties
const UserCollectedProperties& user_collected_properties =
table_reader->GetTableProperties()->user_collected_properties;
UserCollectedProperties::const_iterator external_sst_file_version_iter =
user_collected_properties.find(ExternalSstFilePropertyNames::kVersion);
if (external_sst_file_version_iter == user_collected_properties.end()) {
return Status::InvalidArgument("Generated table version not found");
}
file_info.version =
DecodeFixed32(external_sst_file_version_iter->second.c_str());
if (file_info.version == 1) {
// version 1 imply that all sequence numbers in table equal 0
file_info.sequence_number = 0;
} else {
return Status::InvalidArgument("Generated table version is not supported");
}
// Get number of entries in table
file_info.num_entries = table_reader->GetTableProperties()->num_entries;
ParsedInternalKey key;
std::unique_ptr<Iterator> iter(table_reader->NewIterator(ReadOptions()));
// Get first (smallest) key from file
iter->SeekToFirst();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("Generated table have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("Generated table have non zero sequence number");
}
file_info.smallest_key = key.user_key.ToString();
// Get last (largest) key from file
iter->SeekToLast();
if (!ParseInternalKey(iter->key(), &key)) {
return Status::Corruption("Generated table have corrupted keys");
}
if (key.sequence != 0) {
return Status::Corruption("Generated table have non zero sequence number");
}
file_info.largest_key = key.user_key.ToString();
return AddFile(column_family, &file_info, move_file);
}
Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
const ExternalSstFileInfo* file_info, bool move_file) {
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
if (cfd->NumberLevels() <= 1) {
return Status::NotSupported(
"AddFile requires a database with at least 2 levels");
}
if (file_info->version != 1) {
return Status::InvalidArgument("Generated table version is not supported");
}
// version 1 imply that file have only Put Operations with Sequence Number = 0
FileMetaData meta;
meta.smallest =
InternalKey(file_info->smallest_key, file_info->sequence_number,
ValueType::kTypeValue);
meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number,
ValueType::kTypeValue);
if (!meta.smallest.Valid() || !meta.largest.Valid()) {
return Status::Corruption("Generated table have corrupted keys");
}
meta.smallest_seqno = file_info->sequence_number;
meta.largest_seqno = file_info->sequence_number;
if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) {
return Status::InvalidArgument(
"Non zero sequence numbers are not supported");
}
// Generate a location for the new table
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size);
std::string db_fname = TableFileName(
db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
if (move_file) {
status = env_->LinkFile(file_info->file_path, db_fname);
if (status.IsNotSupported()) {
// Original file is on a different FS, use copy instead of hard linking
status = CopyFile(env_, file_info->file_path, db_fname, 0);
}
} else {
status = CopyFile(env_, file_info->file_path, db_fname, 0);
}
if (!status.ok()) {
return status;
}
{
InstrumentedMutexLock l(&mutex_);
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_);
// Make sure memtables are empty
if (!cfd->mem()->IsEmpty() || cfd->imm()->NumNotFlushed() > 0) {
// Cannot add the file since the keys in memtable
// will hide the keys in file
status = Status::NotSupported("Memtable is not empty");
}
// Make sure last sequence number is 0, if there are existing files then
// they should have sequence number = 0
if (status.ok() && versions_->LastSequence() > 0) {
status = Status::NotSupported("Last Sequence number is not zero");
}
auto* vstorage = cfd->current()->storage_info();
if (status.ok()) {
// Make sure that the key range in the file we will add does not overlap
// with previously added files
Slice smallest_user_key = meta.smallest.user_key();
Slice largest_user_key = meta.largest.user_key();
for (int level = 0; level < vstorage->num_non_empty_levels(); level++) {
if (vstorage->OverlapInLevel(level, &smallest_user_key,
&largest_user_key)) {
status = Status::NotSupported("Cannot add overlapping files");
break;
}
}
}
if (status.ok()) {
// We add the file to the last level
int target_level = cfd->NumberLevels() - 1;
if (cfd->ioptions()->level_compaction_dynamic_level_bytes == false) {
// If we are using dynamic level compaction we add the file to
// last level with files
target_level = vstorage->num_non_empty_levels() - 1;
if (target_level <= 0) {
target_level = 1;
}
}
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno,
meta.marked_for_compaction);
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir());
}
write_thread_.ExitUnbatched(&w);
if (status.ok()) {
delete InstallSuperVersionAndScheduleWork(cfd, nullptr,
mutable_cf_options);
}
}
if (!status.ok()) {
// We failed to add the file to the database
Status s = env_->DeleteFile(db_fname);
if (!s.ok()) {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"AddFile() clean up for file %s failed : %s", db_fname.c_str(),
s.ToString().c_str());
}
} else if (status.ok() && move_file) {
// The file was moved and added successfully, remove original file link
Status s = env_->DeleteFile(file_info->file_path);
if (!s.ok()) {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"%s was added to DB successfully but failed to remove original file "
"link : %s",
file_info->file_path.c_str(), s.ToString().c_str());
}
}
return status;
}
#endif // ROCKSDB_LITE
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) {

@ -55,6 +55,7 @@ class CompactionFilterV2;
class Arena;
class WriteCallback;
struct JobContext;
struct ExternalSstFileInfo;
class DBImpl : public DB {
public:
@ -225,6 +226,13 @@ class DBImpl : public DB {
Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key,
SequenceNumber* seq);
using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const ExternalSstFileInfo* file_info,
bool move_file) override;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::string& file_path, bool move_file) override;
#endif // ROCKSDB_LITE
// checks if all live files exist on file system and that their file sizes

@ -44,6 +44,7 @@
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/sst_file_writer.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
@ -5548,6 +5549,18 @@ class ModelDB: public DB {
return s;
}
using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const ExternalSstFileInfo* file_path,
bool move_file) override {
return Status::NotSupported("Not implemented.");
}
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::string& file_path,
bool move_file) override {
return Status::NotSupported("Not implemented.");
}
using DB::GetPropertiesOfAllTables;
virtual Status GetPropertiesOfAllTables(
ColumnFamilyHandle* column_family,
@ -9301,6 +9314,318 @@ TEST_F(DBTest, GetTotalSstFilesSizeVersionsFilesShared) {
ASSERT_EQ(total_sst_files_size, 0);
}
TEST_F(DBTest, AddExternalSstFile) {
do {
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
options.env = env_;
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
// file1.sst (0 => 99)
std::string file1 = sst_files_folder + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
for (int k = 0; k < 100; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
ASSERT_EQ(file1_info.largest_key, Key(99));
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Add(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
// file2.sst (100 => 199)
std::string file2 = sst_files_folder + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
for (int k = 100; k < 200; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
// Cannot add this key because it's not after last added key
s = sst_file_writer.Add(Key(99), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 100);
ASSERT_EQ(file2_info.smallest_key, Key(100));
ASSERT_EQ(file2_info.largest_key, Key(199));
// file3.sst (195 => 299)
// This file values overlap with file2 values
std::string file3 = sst_files_folder + "file3.sst";
ASSERT_OK(sst_file_writer.Open(file3));
for (int k = 195; k < 300; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 105);
ASSERT_EQ(file3_info.smallest_key, Key(195));
ASSERT_EQ(file3_info.largest_key, Key(299));
// file4.sst (30 => 39)
// This file values overlap with file1 values
std::string file4 = sst_files_folder + "file4.sst";
ASSERT_OK(sst_file_writer.Open(file4));
for (int k = 30; k < 40; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file4_info;
s = sst_file_writer.Finish(&file4_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file4_info.file_path, file4);
ASSERT_EQ(file4_info.num_entries, 10);
ASSERT_EQ(file4_info.smallest_key, Key(30));
ASSERT_EQ(file4_info.largest_key, Key(39));
// file5.sst (400 => 499)
std::string file5 = sst_files_folder + "file5.sst";
ASSERT_OK(sst_file_writer.Open(file5));
for (int k = 400; k < 500; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file5_info;
s = sst_file_writer.Finish(&file5_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file5_info.file_path, file5);
ASSERT_EQ(file5_info.num_entries, 100);
ASSERT_EQ(file5_info.smallest_key, Key(400));
ASSERT_EQ(file5_info.largest_key, Key(499));
DestroyAndReopen(options);
// Add file using file path
s = db_->AddFile(file1);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 100; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
}
// Add file using file info
s = db_->AddFile(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 200; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
}
// This file have overlapping values with the exisitng data
s = db_->AddFile(file3);
ASSERT_FALSE(s.ok()) << s.ToString();
// This file have overlapping values with the exisitng data
s = db_->AddFile(&file4_info);
ASSERT_FALSE(s.ok()) << s.ToString();
// Overwrite values of keys divisible by 5
for (int k = 0; k < 200; k += 5) {
ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
}
ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
// DB have values in memtable now, we cannot add files anymore
s = db_->AddFile(file5);
ASSERT_FALSE(s.ok()) << s.ToString();
// Make sure values are correct before and after flush/compaction
for (int i = 0; i < 2; i++) {
for (int k = 0; k < 200; k++) {
std::string value = Key(k) + "_val";
if (k % 5 == 0) {
value += "_new";
}
ASSERT_EQ(Get(Key(k)), value);
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
// DB sequence number is not zero, cannot add files anymore
s = db_->AddFile(file5);
ASSERT_FALSE(s.ok()) << s.ToString();
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
}
TEST_F(DBTest, AddExternalSstFileNoCopy) {
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
options.env = env_;
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
// file1.sst (0 => 99)
std::string file1 = sst_files_folder + "file1.sst";
ASSERT_OK(sst_file_writer.Open(file1));
for (int k = 0; k < 100; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
ASSERT_EQ(file1_info.largest_key, Key(99));
// file2.sst (100 => 299)
std::string file2 = sst_files_folder + "file2.sst";
ASSERT_OK(sst_file_writer.Open(file2));
for (int k = 100; k < 300; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
}
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 200);
ASSERT_EQ(file2_info.smallest_key, Key(100));
ASSERT_EQ(file2_info.largest_key, Key(299));
// file3.sst (110 => 124) .. overlap with file2.sst
std::string file3 = sst_files_folder + "file3.sst";
ASSERT_OK(sst_file_writer.Open(file3));
for (int k = 110; k < 125; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 15);
ASSERT_EQ(file3_info.smallest_key, Key(110));
ASSERT_EQ(file3_info.largest_key, Key(124));
s = db_->AddFile(&file1_info, true /* move file */);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_EQ(Status::NotFound(), env_->FileExists(file1));
s = db_->AddFile(&file2_info, false /* copy file */);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(env_->FileExists(file2));
// This file have overlapping values with the exisitng data
s = db_->AddFile(&file3_info, true /* move file */);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_OK(env_->FileExists(file3));
for (int k = 0; k < 300; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
}
}
TEST_F(DBTest, AddExternalSstFileMultiThreaded) {
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
// Bulk load 10 files every file contain 1000 keys
int num_files = 10;
int keys_per_file = 1000;
// Generate file names
std::vector<std::string> file_names;
for (int i = 0; i < num_files; i++) {
std::string file_name = "file_" + ToString(i) + ".sst";
file_names.push_back(sst_files_folder + file_name);
}
do {
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
const ImmutableCFOptions ioptions(options);
std::atomic<int> thread_num(0);
std::function<void()> write_file_func = [&]() {
int file_idx = thread_num.fetch_add(1);
int range_start = file_idx * keys_per_file;
int range_end = range_start + keys_per_file;
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
ASSERT_OK(sst_file_writer.Open(file_names[file_idx]));
for (int k = range_start; k < range_end; k++) {
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k)));
}
Status s = sst_file_writer.Finish();
ASSERT_TRUE(s.ok()) << s.ToString();
};
// Write num_files files in parallel
std::vector<std::thread> sst_writer_threads;
for (int i = 0; i < num_files; ++i) {
sst_writer_threads.emplace_back(write_file_func);
}
for (auto& t : sst_writer_threads) {
t.join();
}
fprintf(stderr, "Wrote %d files (%d keys)\n", num_files,
num_files * keys_per_file);
thread_num.store(0);
std::atomic<int> files_added(0);
std::function<void()> load_file_func = [&]() {
// We intentionally add every file twice, and assert that it was added
// only once and the other add failed
int thread_id = thread_num.fetch_add(1);
int file_idx = thread_id / 2;
// sometimes we use copy, sometimes link .. the result should be the same
bool move_file = (thread_id % 3 == 0);
Status s = db_->AddFile(file_names[file_idx], move_file);
if (s.ok()) {
files_added++;
}
};
// Bulk load num_files files in parallel
std::vector<std::thread> add_file_threads;
DestroyAndReopen(options);
for (int i = 0; i < num_files * 2; ++i) {
add_file_threads.emplace_back(load_file_func);
}
for (auto& t : add_file_threads) {
t.join();
}
ASSERT_EQ(files_added.load(), num_files);
fprintf(stderr, "Loaded %d files (%d keys)\n", num_files,
num_files * keys_per_file);
// Overwrite values of keys divisible by 100
for (int k = 0; k < num_files * keys_per_file; k += 100) {
std::string key = Key(k);
Status s = Put(key, key + "_new");
ASSERT_TRUE(s.ok());
}
for (int i = 0; i < 2; i++) {
// Make sure the values are correct before and after flush/compaction
for (int k = 0; k < num_files * keys_per_file; ++k) {
std::string key = Key(k);
std::string value = (k % 100 == 0) ? (key + "_new") : key;
ASSERT_EQ(Get(key), value);
}
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
}
fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
}
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Values(1, 4));

@ -36,7 +36,7 @@ class IntTblPropCollector {
virtual bool NeedCompact() const { return false; }
};
// Facrtory for internal table properties collector.
// Factory for internal table properties collector.
class IntTblPropCollectorFactory {
public:
virtual ~IntTblPropCollectorFactory() {}

@ -42,6 +42,7 @@ struct FlushOptions;
struct CompactionOptions;
struct CompactRangeOptions;
struct TableProperties;
struct ExternalSstFileInfo;
class WriteBatch;
class Env;
class EventListener;
@ -655,6 +656,36 @@ class DB {
ColumnFamilyMetaData* metadata) {
GetColumnFamilyMetaData(DefaultColumnFamily(), metadata);
}
// Load table file located at "file_path" into "column_family", a pointer to
// ExternalSstFileInfo can be used instead of "file_path" to do a blind add
// that wont need to read the file, move_file can be set to true to
// move the file instead of copying it.
//
// Current Requirements:
// (1) Memtable is empty.
// (2) All existing files (if any) have sequence number = 0.
// (3) Key range in loaded table file don't overlap with existing
// files key ranges.
// (4) No other writes happen during AddFile call, otherwise
// DB may get corrupted.
// (5) Database have at least 2 levels.
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::string& file_path,
bool move_file = false) = 0;
virtual Status AddFile(const std::string& file_path, bool move_file = false) {
return AddFile(DefaultColumnFamily(), file_path, move_file);
}
// Load table file with information "file_info" into "column_family"
virtual Status AddFile(ColumnFamilyHandle* column_family,
const ExternalSstFileInfo* file_info,
bool move_file = false) = 0;
virtual Status AddFile(const ExternalSstFileInfo* file_info,
bool move_file = false) {
return AddFile(DefaultColumnFamily(), file_info, move_file);
}
#endif // ROCKSDB_LITE
// Sets the globally unique ID created at database creation time by invoking

@ -0,0 +1,77 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/types.h"
namespace rocksdb {
class Comparator;
// Table Properties that are specific to tables created by SstFileWriter.
struct ExternalSstFilePropertyNames {
// value of this property is a fixed int32 number.
static const std::string kVersion;
};
// ExternalSstFileInfo include information about sst files created
// using SstFileWriter
struct ExternalSstFileInfo {
ExternalSstFileInfo() {}
ExternalSstFileInfo(const std::string& _file_path,
const std::string& _smallest_key,
const std::string& _largest_key,
SequenceNumber _sequence_number, uint64_t _file_size,
int32_t _num_entries, int32_t _version)
: file_path(_file_path),
smallest_key(_smallest_key),
largest_key(_largest_key),
sequence_number(_sequence_number),
file_size(_file_size),
num_entries(_num_entries),
version(_version) {}
std::string file_path; // external sst file path
std::string smallest_key; // smallest user key in file
std::string largest_key; // largest user key in file
SequenceNumber sequence_number; // sequence number of all keys in file
uint64_t file_size; // file size in bytes
uint64_t num_entries; // number of entries in file
int32_t version; // file version
};
// SstFileWriter is used to create sst files that can be added to database later
// All keys in files generated by SstFileWriter will have sequence number = 0
class SstFileWriter {
public:
SstFileWriter(const EnvOptions& env_options,
const ImmutableCFOptions& ioptions,
const Comparator* user_comparator);
~SstFileWriter();
// Prepare SstFileWriter to write into file located at "file_path".
Status Open(const std::string& file_path);
// Add key, value to currently opened file
// REQUIRES: key is after any previously added key according to comparator.
Status Add(const Slice& user_key, const Slice& value);
// Finalize writing to sst file and close file.
//
// An optional ExternalSstFileInfo pointer can be passed to the function
// which will be populated with information about the created sst file
Status Finish(ExternalSstFileInfo* file_info = nullptr);
private:
class SstFileWriterPropertiesCollectorFactory;
class SstFileWriterPropertiesCollector;
struct Rep;
Rep* rep_;
};
} // namespace rocksdb

@ -336,11 +336,13 @@ class TableFactory {
// in parameter file. It's the caller's responsibility to make sure
// file is in the correct format.
//
// NewTableReader() is called in two places:
// NewTableReader() is called in three places:
// (1) TableCache::FindTable() calls the function when table cache miss
// and cache the table object returned.
// (1) SstFileReader (for SST Dump) opens the table and dump the table
// (2) SstFileReader (for SST Dump) opens the table and dump the table
// contents using the interator of the table.
// (3) DBImpl::AddFile() calls this function to read the contents of
// the sst file it's attempting to add
//
// table_reader_options is a TableReaderOptions which contain all the
// needed parameters and configuration to open the table.

@ -63,6 +63,18 @@ class StackableDB : public DB {
return db_->MultiGet(options, column_family, keys, values);
}
using DB::AddFile;
virtual Status AddFile(ColumnFamilyHandle* column_family,
const ExternalSstFileInfo* file_info,
bool move_file) override {
return db_->AddFile(column_family, file_info, move_file);
}
virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::string& file_path,
bool move_file) override {
return db_->AddFile(column_family, file_path, move_file);
}
using DB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,

@ -68,6 +68,7 @@ LIB_SOURCES = \
table/iterator.cc \
table/merger.cc \
table/meta_blocks.cc \
table/sst_file_writer.cc \
table/plain_table_builder.cc \
table/plain_table_factory.cc \
table/plain_table_index.cc \

@ -0,0 +1,188 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "rocksdb/sst_file_writer.h"
#include <vector>
#include "db/dbformat.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
#include "util/file_reader_writer.h"
#include "util/string_util.h"
namespace rocksdb {
const std::string ExternalSstFilePropertyNames::kVersion =
"rocksdb.external_sst_file.version";
// PropertiesCollector used to add properties specific to tables
// generated by SstFileWriter
class SstFileWriter::SstFileWriterPropertiesCollector
: public IntTblPropCollector {
public:
explicit SstFileWriterPropertiesCollector(int32_t version)
: version_(version) {}
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* properties) override {
std::string version_val;
PutFixed32(&version_val, static_cast<int32_t>(version_));
properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
return Status::OK();
}
virtual const char* Name() const override {
return "SstFileWriterPropertiesCollector";
}
virtual UserCollectedProperties GetReadableProperties() const override {
return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
}
private:
int32_t version_;
};
class SstFileWriter::SstFileWriterPropertiesCollectorFactory
: public IntTblPropCollectorFactory {
public:
explicit SstFileWriterPropertiesCollectorFactory(int32_t version)
: version_(version) {}
virtual IntTblPropCollector* CreateIntTblPropCollector() override {
return new SstFileWriterPropertiesCollector(version_);
}
virtual const char* Name() const override {
return "SstFileWriterPropertiesCollector";
}
private:
int32_t version_;
};
struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions,
const Comparator* _user_comparator)
: env_options(_env_options),
ioptions(_ioptions),
internal_comparator(_user_comparator) {}
std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<TableBuilder> builder;
EnvOptions env_options;
ImmutableCFOptions ioptions;
InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info;
};
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
const ImmutableCFOptions& ioptions,
const Comparator* user_comparator)
: rep_(new Rep(env_options, ioptions, user_comparator)) {}
SstFileWriter::~SstFileWriter() { delete rep_; }
Status SstFileWriter::Open(const std::string& file_path) {
Rep* r = rep_;
Status s;
std::unique_ptr<WritableFile> sst_file;
s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
if (!s.ok()) {
return s;
}
CompressionType compression_type = r->ioptions.compression;
if (!r->ioptions.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());
}
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
int_tbl_prop_collector_factories.emplace_back(
new SstFileWriterPropertiesCollectorFactory(1 /* version */));
TableBuilderOptions table_builder_options(
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
compression_type, r->ioptions.compression_opts, false);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), r->env_options));
r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
table_builder_options, r->file_writer.get()));
r->file_info.file_path = file_path;
r->file_info.file_size = 0;
r->file_info.num_entries = 0;
r->file_info.sequence_number = 0;
r->file_info.version = 1;
return s;
}
Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
Rep* r = rep_;
if (!r->builder) {
return Status::InvalidArgument("File is not opened");
}
if (r->file_info.num_entries == 0) {
r->file_info.smallest_key = user_key.ToString();
} else {
if (r->internal_comparator.user_comparator()->Compare(
user_key, r->file_info.largest_key) <= 0) {
// Make sure that keys are added in order
return Status::InvalidArgument("Keys must be added in order");
}
}
// update file info
r->file_info.num_entries++;
r->file_info.largest_key = user_key.ToString();
r->file_info.file_size = r->builder->FileSize();
InternalKey ikey(user_key, 0 /* Sequence Number */,
ValueType::kTypeValue /* Put */);
r->builder->Add(ikey.Encode(), value);
return Status::OK();
}
Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
Rep* r = rep_;
if (!r->builder) {
return Status::InvalidArgument("File is not opened");
}
Status s = r->builder->Finish();
if (s.ok()) {
if (!r->ioptions.disable_data_sync) {
s = r->file_writer->Sync(r->ioptions.use_fsync);
}
if (s.ok()) {
s = r->file_writer->Close();
}
} else {
r->builder->Abandon();
}
if (!s.ok()) {
r->ioptions.env->DeleteFile(r->file_info.file_path);
}
if (s.ok() && file_info != nullptr) {
r->file_info.file_size = r->builder->FileSize();
*file_info = r->file_info;
}
r->builder.reset();
return s;
}
} // namespace rocksdb
Loading…
Cancel
Save