From f03b5c987bb61f635c510ccd6bcb8a98bfcc41a8 Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Wed, 23 Sep 2015 12:42:43 -0700 Subject: [PATCH] Add experimental DB::AddFile() to plug sst files into empty DB Summary: This is an initial version of bulk load feature This diff allow us to create sst files, and then bulk load them later, right now the restrictions for loading an sst file are (1) Memtables are empty (2) Added sst files have sequence number = 0, and existing values in database have sequence number = 0 (3) Added sst files values are not overlapping Test Plan: unit testing Reviewers: igor, ott, sdong Reviewed By: sdong Subscribers: leveldb, ott, dhruba Differential Revision: https://reviews.facebook.net/D39081 --- HISTORY.md | 3 + db/db_impl.cc | 216 +++++++++++++++ db/db_impl.h | 8 + db/db_test.cc | 325 +++++++++++++++++++++++ db/table_properties_collector.h | 2 +- include/rocksdb/db.h | 31 +++ include/rocksdb/sst_file_writer.h | 77 ++++++ include/rocksdb/table.h | 6 +- include/rocksdb/utilities/stackable_db.h | 12 + src.mk | 1 + table/sst_file_writer.cc | 188 +++++++++++++ 11 files changed, 866 insertions(+), 3 deletions(-) create mode 100644 include/rocksdb/sst_file_writer.h create mode 100644 table/sst_file_writer.cc diff --git a/HISTORY.md b/HISTORY.md index 82656e2a0..81c27f8f9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,9 +3,12 @@ ## Unreleased ### New Features * Added single delete operation as a more efficient way to delete keys that have not been overwritten. +* Added experimental AddFile() to DB interface that allow users to add files created by SstFileWriter into an empty Database, see include/rocksdb/sst_file_writer.h and DB::AddFile() for more info. ### Public API Changes * Added SingleDelete() to the DB interface. +* Added AddFile() to DB interface. +* Added SstFileWriter class. ## 4.0.0 (9/9/2015) ### New Features diff --git a/db/db_impl.cc b/db/db_impl.cc index c2219b1f7..f4ff97625 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -58,6 +58,7 @@ #include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -3071,6 +3072,221 @@ std::vector DBImpl::MultiGet( return stat_list; } +#ifndef ROCKSDB_LITE +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, bool move_file) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + ExternalSstFileInfo file_info; + file_info.file_path = file_path; + status = env_->GetFileSize(file_path, &file_info.file_size); + if (!status.ok()) { + return status; + } + + // Access the file using TableReader to extract + // version, number of entries, smallest user key, largest user key + std::unique_ptr sst_file; + status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + std::unique_ptr sst_file_reader; + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file))); + + std::unique_ptr table_reader; + status = cfd->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd->ioptions(), env_options_, + cfd->internal_comparator()), + std::move(sst_file_reader), file_info.file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external sst file version from table properties + const UserCollectedProperties& user_collected_properties = + table_reader->GetTableProperties()->user_collected_properties; + UserCollectedProperties::const_iterator external_sst_file_version_iter = + user_collected_properties.find(ExternalSstFilePropertyNames::kVersion); + if (external_sst_file_version_iter == user_collected_properties.end()) { + return Status::InvalidArgument("Generated table version not found"); + } + + file_info.version = + DecodeFixed32(external_sst_file_version_iter->second.c_str()); + if (file_info.version == 1) { + // version 1 imply that all sequence numbers in table equal 0 + file_info.sequence_number = 0; + } else { + return Status::InvalidArgument("Generated table version is not supported"); + } + + // Get number of entries in table + file_info.num_entries = table_reader->GetTableProperties()->num_entries; + + ParsedInternalKey key; + std::unique_ptr iter(table_reader->NewIterator(ReadOptions())); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("Generated table have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("Generated table have non zero sequence number"); + } + file_info.smallest_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("Generated table have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("Generated table have non zero sequence number"); + } + file_info.largest_key = key.user_key.ToString(); + + return AddFile(column_family, &file_info, move_file); +} + +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, bool move_file) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + if (cfd->NumberLevels() <= 1) { + return Status::NotSupported( + "AddFile requires a database with at least 2 levels"); + } + if (file_info->version != 1) { + return Status::InvalidArgument("Generated table version is not supported"); + } + // version 1 imply that file have only Put Operations with Sequence Number = 0 + + FileMetaData meta; + meta.smallest = + InternalKey(file_info->smallest_key, file_info->sequence_number, + ValueType::kTypeValue); + meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number, + ValueType::kTypeValue); + if (!meta.smallest.Valid() || !meta.largest.Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + meta.smallest_seqno = file_info->sequence_number; + meta.largest_seqno = file_info->sequence_number; + if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) { + return Status::InvalidArgument( + "Non zero sequence numbers are not supported"); + } + // Generate a location for the new table + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size); + std::string db_fname = TableFileName( + db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); + + if (move_file) { + status = env_->LinkFile(file_info->file_path, db_fname); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = CopyFile(env_, file_info->file_path, db_fname, 0); + } + } else { + status = CopyFile(env_, file_info->file_path, db_fname, 0); + } + if (!status.ok()) { + return status; + } + + { + InstrumentedMutexLock l(&mutex_); + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + + // Make sure memtables are empty + if (!cfd->mem()->IsEmpty() || cfd->imm()->NumNotFlushed() > 0) { + // Cannot add the file since the keys in memtable + // will hide the keys in file + status = Status::NotSupported("Memtable is not empty"); + } + + // Make sure last sequence number is 0, if there are existing files then + // they should have sequence number = 0 + if (status.ok() && versions_->LastSequence() > 0) { + status = Status::NotSupported("Last Sequence number is not zero"); + } + + auto* vstorage = cfd->current()->storage_info(); + if (status.ok()) { + // Make sure that the key range in the file we will add does not overlap + // with previously added files + Slice smallest_user_key = meta.smallest.user_key(); + Slice largest_user_key = meta.largest.user_key(); + for (int level = 0; level < vstorage->num_non_empty_levels(); level++) { + if (vstorage->OverlapInLevel(level, &smallest_user_key, + &largest_user_key)) { + status = Status::NotSupported("Cannot add overlapping files"); + break; + } + } + } + + if (status.ok()) { + // We add the file to the last level + int target_level = cfd->NumberLevels() - 1; + if (cfd->ioptions()->level_compaction_dynamic_level_bytes == false) { + // If we are using dynamic level compaction we add the file to + // last level with files + target_level = vstorage->num_non_empty_levels() - 1; + if (target_level <= 0) { + target_level = 1; + } + } + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno, + meta.marked_for_compaction); + + status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, + directories_.GetDbDir()); + } + write_thread_.ExitUnbatched(&w); + + if (status.ok()) { + delete InstallSuperVersionAndScheduleWork(cfd, nullptr, + mutable_cf_options); + } + } + + if (!status.ok()) { + // We failed to add the file to the database + Status s = env_->DeleteFile(db_fname); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", db_fname.c_str(), + s.ToString().c_str()); + } + } else if (status.ok() && move_file) { + // The file was moved and added successfully, remove original file link + Status s = env_->DeleteFile(file_info->file_path); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "%s was added to DB successfully but failed to remove original file " + "link : %s", + file_info->file_path.c_str(), s.ToString().c_str()); + } + } + return status; +} +#endif // ROCKSDB_LITE + Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { diff --git a/db/db_impl.h b/db/db_impl.h index 18a2b34a8..634d58577 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -55,6 +55,7 @@ class CompactionFilterV2; class Arena; class WriteCallback; struct JobContext; +struct ExternalSstFileInfo; class DBImpl : public DB { public: @@ -225,6 +226,13 @@ class DBImpl : public DB { Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key, SequenceNumber* seq); + using DB::AddFile; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, + bool move_file) override; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, bool move_file) override; + #endif // ROCKSDB_LITE // checks if all live files exist on file system and that their file sizes diff --git a/db/db_test.cc b/db/db_test.cc index 4d7b6276d..f02835fa8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -44,6 +44,7 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/snapshot.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "rocksdb/thread_status.h" @@ -5548,6 +5549,18 @@ class ModelDB: public DB { return s; } + using DB::AddFile; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_path, + bool move_file) override { + return Status::NotSupported("Not implemented."); + } + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, + bool move_file) override { + return Status::NotSupported("Not implemented."); + } + using DB::GetPropertiesOfAllTables; virtual Status GetPropertiesOfAllTables( ColumnFamilyHandle* column_family, @@ -9301,6 +9314,318 @@ TEST_F(DBTest, GetTotalSstFilesSizeVersionsFilesShared) { ASSERT_EQ(total_sst_files_size, 0); } +TEST_F(DBTest, AddExternalSstFile) { + do { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.env = env_; + const ImmutableCFOptions ioptions(options); + + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + // file1.sst (0 => 99) + std::string file1 = sst_files_folder + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(0)); + ASSERT_EQ(file1_info.largest_key, Key(99)); + // sst_file_writer already finished, cannot add this value + s = sst_file_writer.Add(Key(100), "bad_val"); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // file2.sst (100 => 199) + std::string file2 = sst_files_folder + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 100; k < 200; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + // Cannot add this key because it's not after last added key + s = sst_file_writer.Add(Key(99), "bad_val"); + ASSERT_FALSE(s.ok()) << s.ToString(); + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 100); + ASSERT_EQ(file2_info.smallest_key, Key(100)); + ASSERT_EQ(file2_info.largest_key, Key(199)); + + // file3.sst (195 => 299) + // This file values overlap with file2 values + std::string file3 = sst_files_folder + "file3.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 195; k < 300; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 105); + ASSERT_EQ(file3_info.smallest_key, Key(195)); + ASSERT_EQ(file3_info.largest_key, Key(299)); + + // file4.sst (30 => 39) + // This file values overlap with file1 values + std::string file4 = sst_files_folder + "file4.sst"; + ASSERT_OK(sst_file_writer.Open(file4)); + for (int k = 30; k < 40; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file4_info; + s = sst_file_writer.Finish(&file4_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file4_info.file_path, file4); + ASSERT_EQ(file4_info.num_entries, 10); + ASSERT_EQ(file4_info.smallest_key, Key(30)); + ASSERT_EQ(file4_info.largest_key, Key(39)); + + // file5.sst (400 => 499) + std::string file5 = sst_files_folder + "file5.sst"; + ASSERT_OK(sst_file_writer.Open(file5)); + for (int k = 400; k < 500; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file5_info; + s = sst_file_writer.Finish(&file5_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file5_info.file_path, file5); + ASSERT_EQ(file5_info.num_entries, 100); + ASSERT_EQ(file5_info.smallest_key, Key(400)); + ASSERT_EQ(file5_info.largest_key, Key(499)); + + DestroyAndReopen(options); + // Add file using file path + s = db_->AddFile(file1); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); + for (int k = 0; k < 100; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + + // Add file using file info + s = db_->AddFile(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); + for (int k = 0; k < 200; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + + // This file have overlapping values with the exisitng data + s = db_->AddFile(file3); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // This file have overlapping values with the exisitng data + s = db_->AddFile(&file4_info); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Overwrite values of keys divisible by 5 + for (int k = 0; k < 200; k += 5) { + ASSERT_OK(Put(Key(k), Key(k) + "_val_new")); + } + ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); + + // DB have values in memtable now, we cannot add files anymore + s = db_->AddFile(file5); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Make sure values are correct before and after flush/compaction + for (int i = 0; i < 2; i++) { + for (int k = 0; k < 200; k++) { + std::string value = Key(k) + "_val"; + if (k % 5 == 0) { + value += "_new"; + } + ASSERT_EQ(Get(Key(k)), value); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + + // DB sequence number is not zero, cannot add files anymore + s = db_->AddFile(file5); + ASSERT_FALSE(s.ok()) << s.ToString(); + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} + +TEST_F(DBTest, AddExternalSstFileNoCopy) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.env = env_; + const ImmutableCFOptions ioptions(options); + + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + // file1.sst (0 => 99) + std::string file1 = sst_files_folder + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(0)); + ASSERT_EQ(file1_info.largest_key, Key(99)); + + // file2.sst (100 => 299) + std::string file2 = sst_files_folder + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 100; k < 300; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 200); + ASSERT_EQ(file2_info.smallest_key, Key(100)); + ASSERT_EQ(file2_info.largest_key, Key(299)); + + // file3.sst (110 => 124) .. overlap with file2.sst + std::string file3 = sst_files_folder + "file3.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 110; k < 125; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 15); + ASSERT_EQ(file3_info.smallest_key, Key(110)); + ASSERT_EQ(file3_info.largest_key, Key(124)); + + s = db_->AddFile(&file1_info, true /* move file */); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(Status::NotFound(), env_->FileExists(file1)); + + s = db_->AddFile(&file2_info, false /* copy file */); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file2)); + + // This file have overlapping values with the exisitng data + s = db_->AddFile(&file3_info, true /* move file */); + ASSERT_FALSE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file3)); + + for (int k = 0; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } +} + +TEST_F(DBTest, AddExternalSstFileMultiThreaded) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + // Bulk load 10 files every file contain 1000 keys + int num_files = 10; + int keys_per_file = 1000; + + // Generate file names + std::vector file_names; + for (int i = 0; i < num_files; i++) { + std::string file_name = "file_" + ToString(i) + ".sst"; + file_names.push_back(sst_files_folder + file_name); + } + + do { + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + const ImmutableCFOptions ioptions(options); + + std::atomic thread_num(0); + std::function write_file_func = [&]() { + int file_idx = thread_num.fetch_add(1); + int range_start = file_idx * keys_per_file; + int range_end = range_start + keys_per_file; + + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + ASSERT_OK(sst_file_writer.Open(file_names[file_idx])); + + for (int k = range_start; k < range_end; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k))); + } + + Status s = sst_file_writer.Finish(); + ASSERT_TRUE(s.ok()) << s.ToString(); + }; + // Write num_files files in parallel + std::vector sst_writer_threads; + for (int i = 0; i < num_files; ++i) { + sst_writer_threads.emplace_back(write_file_func); + } + + for (auto& t : sst_writer_threads) { + t.join(); + } + + fprintf(stderr, "Wrote %d files (%d keys)\n", num_files, + num_files * keys_per_file); + + thread_num.store(0); + std::atomic files_added(0); + std::function load_file_func = [&]() { + // We intentionally add every file twice, and assert that it was added + // only once and the other add failed + int thread_id = thread_num.fetch_add(1); + int file_idx = thread_id / 2; + // sometimes we use copy, sometimes link .. the result should be the same + bool move_file = (thread_id % 3 == 0); + + Status s = db_->AddFile(file_names[file_idx], move_file); + if (s.ok()) { + files_added++; + } + }; + // Bulk load num_files files in parallel + std::vector add_file_threads; + DestroyAndReopen(options); + for (int i = 0; i < num_files * 2; ++i) { + add_file_threads.emplace_back(load_file_func); + } + + for (auto& t : add_file_threads) { + t.join(); + } + ASSERT_EQ(files_added.load(), num_files); + fprintf(stderr, "Loaded %d files (%d keys)\n", num_files, + num_files * keys_per_file); + + // Overwrite values of keys divisible by 100 + for (int k = 0; k < num_files * keys_per_file; k += 100) { + std::string key = Key(k); + Status s = Put(key, key + "_new"); + ASSERT_TRUE(s.ok()); + } + + for (int i = 0; i < 2; i++) { + // Make sure the values are correct before and after flush/compaction + for (int k = 0; k < num_files * keys_per_file; ++k) { + std::string key = Key(k); + std::string value = (k % 100 == 0) ? (key + "_new") : key; + ASSERT_EQ(Get(key), value); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + + fprintf(stderr, "Verified %d values\n", num_files * keys_per_file); + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} + INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, ::testing::Values(1, 4)); diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 4bd01fd16..51c2ba915 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -36,7 +36,7 @@ class IntTblPropCollector { virtual bool NeedCompact() const { return false; } }; -// Facrtory for internal table properties collector. +// Factory for internal table properties collector. class IntTblPropCollectorFactory { public: virtual ~IntTblPropCollectorFactory() {} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c73720951..7038a781a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -42,6 +42,7 @@ struct FlushOptions; struct CompactionOptions; struct CompactRangeOptions; struct TableProperties; +struct ExternalSstFileInfo; class WriteBatch; class Env; class EventListener; @@ -655,6 +656,36 @@ class DB { ColumnFamilyMetaData* metadata) { GetColumnFamilyMetaData(DefaultColumnFamily(), metadata); } + + // Load table file located at "file_path" into "column_family", a pointer to + // ExternalSstFileInfo can be used instead of "file_path" to do a blind add + // that wont need to read the file, move_file can be set to true to + // move the file instead of copying it. + // + // Current Requirements: + // (1) Memtable is empty. + // (2) All existing files (if any) have sequence number = 0. + // (3) Key range in loaded table file don't overlap with existing + // files key ranges. + // (4) No other writes happen during AddFile call, otherwise + // DB may get corrupted. + // (5) Database have at least 2 levels. + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, + bool move_file = false) = 0; + virtual Status AddFile(const std::string& file_path, bool move_file = false) { + return AddFile(DefaultColumnFamily(), file_path, move_file); + } + + // Load table file with information "file_info" into "column_family" + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, + bool move_file = false) = 0; + virtual Status AddFile(const ExternalSstFileInfo* file_info, + bool move_file = false) { + return AddFile(DefaultColumnFamily(), file_info, move_file); + } + #endif // ROCKSDB_LITE // Sets the globally unique ID created at database creation time by invoking diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h new file mode 100644 index 000000000..eb2f89491 --- /dev/null +++ b/include/rocksdb/sst_file_writer.h @@ -0,0 +1,77 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include +#include "rocksdb/env.h" +#include "rocksdb/immutable_options.h" +#include "rocksdb/types.h" + +namespace rocksdb { + +class Comparator; + +// Table Properties that are specific to tables created by SstFileWriter. +struct ExternalSstFilePropertyNames { + // value of this property is a fixed int32 number. + static const std::string kVersion; +}; + +// ExternalSstFileInfo include information about sst files created +// using SstFileWriter +struct ExternalSstFileInfo { + ExternalSstFileInfo() {} + ExternalSstFileInfo(const std::string& _file_path, + const std::string& _smallest_key, + const std::string& _largest_key, + SequenceNumber _sequence_number, uint64_t _file_size, + int32_t _num_entries, int32_t _version) + : file_path(_file_path), + smallest_key(_smallest_key), + largest_key(_largest_key), + sequence_number(_sequence_number), + file_size(_file_size), + num_entries(_num_entries), + version(_version) {} + + std::string file_path; // external sst file path + std::string smallest_key; // smallest user key in file + std::string largest_key; // largest user key in file + SequenceNumber sequence_number; // sequence number of all keys in file + uint64_t file_size; // file size in bytes + uint64_t num_entries; // number of entries in file + int32_t version; // file version +}; + +// SstFileWriter is used to create sst files that can be added to database later +// All keys in files generated by SstFileWriter will have sequence number = 0 +class SstFileWriter { + public: + SstFileWriter(const EnvOptions& env_options, + const ImmutableCFOptions& ioptions, + const Comparator* user_comparator); + + ~SstFileWriter(); + + // Prepare SstFileWriter to write into file located at "file_path". + Status Open(const std::string& file_path); + + // Add key, value to currently opened file + // REQUIRES: key is after any previously added key according to comparator. + Status Add(const Slice& user_key, const Slice& value); + + // Finalize writing to sst file and close file. + // + // An optional ExternalSstFileInfo pointer can be passed to the function + // which will be populated with information about the created sst file + Status Finish(ExternalSstFileInfo* file_info = nullptr); + + private: + class SstFileWriterPropertiesCollectorFactory; + class SstFileWriterPropertiesCollector; + struct Rep; + Rep* rep_; +}; +} // namespace rocksdb diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index d642319f1..e52b58099 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -336,11 +336,13 @@ class TableFactory { // in parameter file. It's the caller's responsibility to make sure // file is in the correct format. // - // NewTableReader() is called in two places: + // NewTableReader() is called in three places: // (1) TableCache::FindTable() calls the function when table cache miss // and cache the table object returned. - // (1) SstFileReader (for SST Dump) opens the table and dump the table + // (2) SstFileReader (for SST Dump) opens the table and dump the table // contents using the interator of the table. + // (3) DBImpl::AddFile() calls this function to read the contents of + // the sst file it's attempting to add // // table_reader_options is a TableReaderOptions which contain all the // needed parameters and configuration to open the table. diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 2077f8f87..95eddc62d 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -63,6 +63,18 @@ class StackableDB : public DB { return db_->MultiGet(options, column_family, keys, values); } + using DB::AddFile; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, + bool move_file) override { + return db_->AddFile(column_family, file_info, move_file); + } + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, + bool move_file) override { + return db_->AddFile(column_family, file_path, move_file); + } + using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/src.mk b/src.mk index 22b8ff9fa..1fc4c139d 100644 --- a/src.mk +++ b/src.mk @@ -68,6 +68,7 @@ LIB_SOURCES = \ table/iterator.cc \ table/merger.cc \ table/meta_blocks.cc \ + table/sst_file_writer.cc \ table/plain_table_builder.cc \ table/plain_table_factory.cc \ table/plain_table_index.cc \ diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc new file mode 100644 index 000000000..d780f0a4b --- /dev/null +++ b/table/sst_file_writer.cc @@ -0,0 +1,188 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/sst_file_writer.h" + +#include +#include "db/dbformat.h" +#include "rocksdb/table.h" +#include "table/block_based_table_builder.h" +#include "util/file_reader_writer.h" +#include "util/string_util.h" + +namespace rocksdb { + +const std::string ExternalSstFilePropertyNames::kVersion = + "rocksdb.external_sst_file.version"; + +// PropertiesCollector used to add properties specific to tables +// generated by SstFileWriter +class SstFileWriter::SstFileWriterPropertiesCollector + : public IntTblPropCollector { + public: + explicit SstFileWriterPropertiesCollector(int32_t version) + : version_(version) {} + + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) override { + // Intentionally left blank. Have no interest in collecting stats for + // individual key/value pairs. + return Status::OK(); + } + + virtual Status Finish(UserCollectedProperties* properties) override { + std::string version_val; + PutFixed32(&version_val, static_cast(version_)); + properties->insert({ExternalSstFilePropertyNames::kVersion, version_val}); + return Status::OK(); + } + + virtual const char* Name() const override { + return "SstFileWriterPropertiesCollector"; + } + + virtual UserCollectedProperties GetReadableProperties() const override { + return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}}; + } + + private: + int32_t version_; +}; + +class SstFileWriter::SstFileWriterPropertiesCollectorFactory + : public IntTblPropCollectorFactory { + public: + explicit SstFileWriterPropertiesCollectorFactory(int32_t version) + : version_(version) {} + + virtual IntTblPropCollector* CreateIntTblPropCollector() override { + return new SstFileWriterPropertiesCollector(version_); + } + + virtual const char* Name() const override { + return "SstFileWriterPropertiesCollector"; + } + + private: + int32_t version_; +}; + +struct SstFileWriter::Rep { + Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions, + const Comparator* _user_comparator) + : env_options(_env_options), + ioptions(_ioptions), + internal_comparator(_user_comparator) {} + + std::unique_ptr file_writer; + std::unique_ptr builder; + EnvOptions env_options; + ImmutableCFOptions ioptions; + InternalKeyComparator internal_comparator; + ExternalSstFileInfo file_info; +}; + +SstFileWriter::SstFileWriter(const EnvOptions& env_options, + const ImmutableCFOptions& ioptions, + const Comparator* user_comparator) + : rep_(new Rep(env_options, ioptions, user_comparator)) {} + +SstFileWriter::~SstFileWriter() { delete rep_; } + +Status SstFileWriter::Open(const std::string& file_path) { + Rep* r = rep_; + Status s; + std::unique_ptr sst_file; + s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); + if (!s.ok()) { + return s; + } + + CompressionType compression_type = r->ioptions.compression; + if (!r->ioptions.compression_per_level.empty()) { + // Use the compression of the last level if we have per level compression + compression_type = *(r->ioptions.compression_per_level.rbegin()); + } + + std::vector> + int_tbl_prop_collector_factories; + int_tbl_prop_collector_factories.emplace_back( + new SstFileWriterPropertiesCollectorFactory(1 /* version */)); + + TableBuilderOptions table_builder_options( + r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, + compression_type, r->ioptions.compression_opts, false); + r->file_writer.reset( + new WritableFileWriter(std::move(sst_file), r->env_options)); + r->builder.reset(r->ioptions.table_factory->NewTableBuilder( + table_builder_options, r->file_writer.get())); + + r->file_info.file_path = file_path; + r->file_info.file_size = 0; + r->file_info.num_entries = 0; + r->file_info.sequence_number = 0; + r->file_info.version = 1; + return s; +} + +Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { + Rep* r = rep_; + if (!r->builder) { + return Status::InvalidArgument("File is not opened"); + } + + if (r->file_info.num_entries == 0) { + r->file_info.smallest_key = user_key.ToString(); + } else { + if (r->internal_comparator.user_comparator()->Compare( + user_key, r->file_info.largest_key) <= 0) { + // Make sure that keys are added in order + return Status::InvalidArgument("Keys must be added in order"); + } + } + + // update file info + r->file_info.num_entries++; + r->file_info.largest_key = user_key.ToString(); + r->file_info.file_size = r->builder->FileSize(); + + InternalKey ikey(user_key, 0 /* Sequence Number */, + ValueType::kTypeValue /* Put */); + r->builder->Add(ikey.Encode(), value); + + return Status::OK(); +} + +Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { + Rep* r = rep_; + if (!r->builder) { + return Status::InvalidArgument("File is not opened"); + } + + Status s = r->builder->Finish(); + if (s.ok()) { + if (!r->ioptions.disable_data_sync) { + s = r->file_writer->Sync(r->ioptions.use_fsync); + } + if (s.ok()) { + s = r->file_writer->Close(); + } + } else { + r->builder->Abandon(); + } + + if (!s.ok()) { + r->ioptions.env->DeleteFile(r->file_info.file_path); + } + + if (s.ok() && file_info != nullptr) { + r->file_info.file_size = r->builder->FileSize(); + *file_info = r->file_info; + } + + r->builder.reset(); + return s; +} +} // namespace rocksdb