diff --git a/HISTORY.md b/HISTORY.md index 82656e2a0..81c27f8f9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,9 +3,12 @@ ## Unreleased ### New Features * Added single delete operation as a more efficient way to delete keys that have not been overwritten. +* Added experimental AddFile() to DB interface that allow users to add files created by SstFileWriter into an empty Database, see include/rocksdb/sst_file_writer.h and DB::AddFile() for more info. ### Public API Changes * Added SingleDelete() to the DB interface. +* Added AddFile() to DB interface. +* Added SstFileWriter class. ## 4.0.0 (9/9/2015) ### New Features diff --git a/db/db_impl.cc b/db/db_impl.cc index c2219b1f7..f4ff97625 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -58,6 +58,7 @@ #include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" @@ -3071,6 +3072,221 @@ std::vector DBImpl::MultiGet( return stat_list; } +#ifndef ROCKSDB_LITE +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, bool move_file) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + ExternalSstFileInfo file_info; + file_info.file_path = file_path; + status = env_->GetFileSize(file_path, &file_info.file_size); + if (!status.ok()) { + return status; + } + + // Access the file using TableReader to extract + // version, number of entries, smallest user key, largest user key + std::unique_ptr sst_file; + status = env_->NewRandomAccessFile(file_path, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + std::unique_ptr sst_file_reader; + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file))); + + std::unique_ptr table_reader; + status = cfd->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd->ioptions(), env_options_, + cfd->internal_comparator()), + std::move(sst_file_reader), file_info.file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external sst file version from table properties + const UserCollectedProperties& user_collected_properties = + table_reader->GetTableProperties()->user_collected_properties; + UserCollectedProperties::const_iterator external_sst_file_version_iter = + user_collected_properties.find(ExternalSstFilePropertyNames::kVersion); + if (external_sst_file_version_iter == user_collected_properties.end()) { + return Status::InvalidArgument("Generated table version not found"); + } + + file_info.version = + DecodeFixed32(external_sst_file_version_iter->second.c_str()); + if (file_info.version == 1) { + // version 1 imply that all sequence numbers in table equal 0 + file_info.sequence_number = 0; + } else { + return Status::InvalidArgument("Generated table version is not supported"); + } + + // Get number of entries in table + file_info.num_entries = table_reader->GetTableProperties()->num_entries; + + ParsedInternalKey key; + std::unique_ptr iter(table_reader->NewIterator(ReadOptions())); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("Generated table have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("Generated table have non zero sequence number"); + } + file_info.smallest_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("Generated table have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("Generated table have non zero sequence number"); + } + file_info.largest_key = key.user_key.ToString(); + + return AddFile(column_family, &file_info, move_file); +} + +Status DBImpl::AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, bool move_file) { + Status status; + auto cfh = reinterpret_cast(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + + if (cfd->NumberLevels() <= 1) { + return Status::NotSupported( + "AddFile requires a database with at least 2 levels"); + } + if (file_info->version != 1) { + return Status::InvalidArgument("Generated table version is not supported"); + } + // version 1 imply that file have only Put Operations with Sequence Number = 0 + + FileMetaData meta; + meta.smallest = + InternalKey(file_info->smallest_key, file_info->sequence_number, + ValueType::kTypeValue); + meta.largest = InternalKey(file_info->largest_key, file_info->sequence_number, + ValueType::kTypeValue); + if (!meta.smallest.Valid() || !meta.largest.Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + meta.smallest_seqno = file_info->sequence_number; + meta.largest_seqno = file_info->sequence_number; + if (meta.smallest_seqno != 0 || meta.largest_seqno != 0) { + return Status::InvalidArgument( + "Non zero sequence numbers are not supported"); + } + // Generate a location for the new table + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, file_info->file_size); + std::string db_fname = TableFileName( + db_options_.db_paths, meta.fd.GetNumber(), meta.fd.GetPathId()); + + if (move_file) { + status = env_->LinkFile(file_info->file_path, db_fname); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = CopyFile(env_, file_info->file_path, db_fname, 0); + } + } else { + status = CopyFile(env_, file_info->file_path, db_fname, 0); + } + if (!status.ok()) { + return status; + } + + { + InstrumentedMutexLock l(&mutex_); + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + + // Make sure memtables are empty + if (!cfd->mem()->IsEmpty() || cfd->imm()->NumNotFlushed() > 0) { + // Cannot add the file since the keys in memtable + // will hide the keys in file + status = Status::NotSupported("Memtable is not empty"); + } + + // Make sure last sequence number is 0, if there are existing files then + // they should have sequence number = 0 + if (status.ok() && versions_->LastSequence() > 0) { + status = Status::NotSupported("Last Sequence number is not zero"); + } + + auto* vstorage = cfd->current()->storage_info(); + if (status.ok()) { + // Make sure that the key range in the file we will add does not overlap + // with previously added files + Slice smallest_user_key = meta.smallest.user_key(); + Slice largest_user_key = meta.largest.user_key(); + for (int level = 0; level < vstorage->num_non_empty_levels(); level++) { + if (vstorage->OverlapInLevel(level, &smallest_user_key, + &largest_user_key)) { + status = Status::NotSupported("Cannot add overlapping files"); + break; + } + } + } + + if (status.ok()) { + // We add the file to the last level + int target_level = cfd->NumberLevels() - 1; + if (cfd->ioptions()->level_compaction_dynamic_level_bytes == false) { + // If we are using dynamic level compaction we add the file to + // last level with files + target_level = vstorage->num_non_empty_levels() - 1; + if (target_level <= 0) { + target_level = 1; + } + } + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno, + meta.marked_for_compaction); + + status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, + directories_.GetDbDir()); + } + write_thread_.ExitUnbatched(&w); + + if (status.ok()) { + delete InstallSuperVersionAndScheduleWork(cfd, nullptr, + mutable_cf_options); + } + } + + if (!status.ok()) { + // We failed to add the file to the database + Status s = env_->DeleteFile(db_fname); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "AddFile() clean up for file %s failed : %s", db_fname.c_str(), + s.ToString().c_str()); + } + } else if (status.ok() && move_file) { + // The file was moved and added successfully, remove original file link + Status s = env_->DeleteFile(file_info->file_path); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, + "%s was added to DB successfully but failed to remove original file " + "link : %s", + file_info->file_path.c_str(), s.ToString().c_str()); + } + } + return status; +} +#endif // ROCKSDB_LITE + Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { diff --git a/db/db_impl.h b/db/db_impl.h index 18a2b34a8..634d58577 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -55,6 +55,7 @@ class CompactionFilterV2; class Arena; class WriteCallback; struct JobContext; +struct ExternalSstFileInfo; class DBImpl : public DB { public: @@ -225,6 +226,13 @@ class DBImpl : public DB { Status GetLatestSequenceForKeyFromMemtable(SuperVersion* sv, const Slice& key, SequenceNumber* seq); + using DB::AddFile; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, + bool move_file) override; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, bool move_file) override; + #endif // ROCKSDB_LITE // checks if all live files exist on file system and that their file sizes diff --git a/db/db_test.cc b/db/db_test.cc index 4d7b6276d..f02835fa8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -44,6 +44,7 @@ #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/snapshot.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "rocksdb/thread_status.h" @@ -5548,6 +5549,18 @@ class ModelDB: public DB { return s; } + using DB::AddFile; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_path, + bool move_file) override { + return Status::NotSupported("Not implemented."); + } + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, + bool move_file) override { + return Status::NotSupported("Not implemented."); + } + using DB::GetPropertiesOfAllTables; virtual Status GetPropertiesOfAllTables( ColumnFamilyHandle* column_family, @@ -9301,6 +9314,318 @@ TEST_F(DBTest, GetTotalSstFilesSizeVersionsFilesShared) { ASSERT_EQ(total_sst_files_size, 0); } +TEST_F(DBTest, AddExternalSstFile) { + do { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.env = env_; + const ImmutableCFOptions ioptions(options); + + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + // file1.sst (0 => 99) + std::string file1 = sst_files_folder + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(0)); + ASSERT_EQ(file1_info.largest_key, Key(99)); + // sst_file_writer already finished, cannot add this value + s = sst_file_writer.Add(Key(100), "bad_val"); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // file2.sst (100 => 199) + std::string file2 = sst_files_folder + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 100; k < 200; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + // Cannot add this key because it's not after last added key + s = sst_file_writer.Add(Key(99), "bad_val"); + ASSERT_FALSE(s.ok()) << s.ToString(); + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 100); + ASSERT_EQ(file2_info.smallest_key, Key(100)); + ASSERT_EQ(file2_info.largest_key, Key(199)); + + // file3.sst (195 => 299) + // This file values overlap with file2 values + std::string file3 = sst_files_folder + "file3.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 195; k < 300; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 105); + ASSERT_EQ(file3_info.smallest_key, Key(195)); + ASSERT_EQ(file3_info.largest_key, Key(299)); + + // file4.sst (30 => 39) + // This file values overlap with file1 values + std::string file4 = sst_files_folder + "file4.sst"; + ASSERT_OK(sst_file_writer.Open(file4)); + for (int k = 30; k < 40; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file4_info; + s = sst_file_writer.Finish(&file4_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file4_info.file_path, file4); + ASSERT_EQ(file4_info.num_entries, 10); + ASSERT_EQ(file4_info.smallest_key, Key(30)); + ASSERT_EQ(file4_info.largest_key, Key(39)); + + // file5.sst (400 => 499) + std::string file5 = sst_files_folder + "file5.sst"; + ASSERT_OK(sst_file_writer.Open(file5)); + for (int k = 400; k < 500; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file5_info; + s = sst_file_writer.Finish(&file5_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file5_info.file_path, file5); + ASSERT_EQ(file5_info.num_entries, 100); + ASSERT_EQ(file5_info.smallest_key, Key(400)); + ASSERT_EQ(file5_info.largest_key, Key(499)); + + DestroyAndReopen(options); + // Add file using file path + s = db_->AddFile(file1); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); + for (int k = 0; k < 100; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + + // Add file using file info + s = db_->AddFile(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); + for (int k = 0; k < 200; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } + + // This file have overlapping values with the exisitng data + s = db_->AddFile(file3); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // This file have overlapping values with the exisitng data + s = db_->AddFile(&file4_info); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Overwrite values of keys divisible by 5 + for (int k = 0; k < 200; k += 5) { + ASSERT_OK(Put(Key(k), Key(k) + "_val_new")); + } + ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); + + // DB have values in memtable now, we cannot add files anymore + s = db_->AddFile(file5); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Make sure values are correct before and after flush/compaction + for (int i = 0; i < 2; i++) { + for (int k = 0; k < 200; k++) { + std::string value = Key(k) + "_val"; + if (k % 5 == 0) { + value += "_new"; + } + ASSERT_EQ(Get(Key(k)), value); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + + // DB sequence number is not zero, cannot add files anymore + s = db_->AddFile(file5); + ASSERT_FALSE(s.ok()) << s.ToString(); + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} + +TEST_F(DBTest, AddExternalSstFileNoCopy) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + options.env = env_; + const ImmutableCFOptions ioptions(options); + + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + // file1.sst (0 => 99) + std::string file1 = sst_files_folder + "file1.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 0; k < 100; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(0)); + ASSERT_EQ(file1_info.largest_key, Key(99)); + + // file2.sst (100 => 299) + std::string file2 = sst_files_folder + "file2.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 100; k < 300; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 200); + ASSERT_EQ(file2_info.smallest_key, Key(100)); + ASSERT_EQ(file2_info.largest_key, Key(299)); + + // file3.sst (110 => 124) .. overlap with file2.sst + std::string file3 = sst_files_folder + "file3.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 110; k < 125; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 15); + ASSERT_EQ(file3_info.smallest_key, Key(110)); + ASSERT_EQ(file3_info.largest_key, Key(124)); + + s = db_->AddFile(&file1_info, true /* move file */); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(Status::NotFound(), env_->FileExists(file1)); + + s = db_->AddFile(&file2_info, false /* copy file */); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file2)); + + // This file have overlapping values with the exisitng data + s = db_->AddFile(&file3_info, true /* move file */); + ASSERT_FALSE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file3)); + + for (int k = 0; k < 300; k++) { + ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); + } +} + +TEST_F(DBTest, AddExternalSstFileMultiThreaded) { + std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/"; + // Bulk load 10 files every file contain 1000 keys + int num_files = 10; + int keys_per_file = 1000; + + // Generate file names + std::vector file_names; + for (int i = 0; i < num_files; i++) { + std::string file_name = "file_" + ToString(i) + ".sst"; + file_names.push_back(sst_files_folder + file_name); + } + + do { + env_->CreateDir(sst_files_folder); + Options options = CurrentOptions(); + const ImmutableCFOptions ioptions(options); + + std::atomic thread_num(0); + std::function write_file_func = [&]() { + int file_idx = thread_num.fetch_add(1); + int range_start = file_idx * keys_per_file; + int range_end = range_start + keys_per_file; + + SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + ASSERT_OK(sst_file_writer.Open(file_names[file_idx])); + + for (int k = range_start; k < range_end; k++) { + ASSERT_OK(sst_file_writer.Add(Key(k), Key(k))); + } + + Status s = sst_file_writer.Finish(); + ASSERT_TRUE(s.ok()) << s.ToString(); + }; + // Write num_files files in parallel + std::vector sst_writer_threads; + for (int i = 0; i < num_files; ++i) { + sst_writer_threads.emplace_back(write_file_func); + } + + for (auto& t : sst_writer_threads) { + t.join(); + } + + fprintf(stderr, "Wrote %d files (%d keys)\n", num_files, + num_files * keys_per_file); + + thread_num.store(0); + std::atomic files_added(0); + std::function load_file_func = [&]() { + // We intentionally add every file twice, and assert that it was added + // only once and the other add failed + int thread_id = thread_num.fetch_add(1); + int file_idx = thread_id / 2; + // sometimes we use copy, sometimes link .. the result should be the same + bool move_file = (thread_id % 3 == 0); + + Status s = db_->AddFile(file_names[file_idx], move_file); + if (s.ok()) { + files_added++; + } + }; + // Bulk load num_files files in parallel + std::vector add_file_threads; + DestroyAndReopen(options); + for (int i = 0; i < num_files * 2; ++i) { + add_file_threads.emplace_back(load_file_func); + } + + for (auto& t : add_file_threads) { + t.join(); + } + ASSERT_EQ(files_added.load(), num_files); + fprintf(stderr, "Loaded %d files (%d keys)\n", num_files, + num_files * keys_per_file); + + // Overwrite values of keys divisible by 100 + for (int k = 0; k < num_files * keys_per_file; k += 100) { + std::string key = Key(k); + Status s = Put(key, key + "_new"); + ASSERT_TRUE(s.ok()); + } + + for (int i = 0; i < 2; i++) { + // Make sure the values are correct before and after flush/compaction + for (int k = 0; k < num_files * keys_per_file; ++k) { + std::string key = Key(k); + std::string value = (k % 100 == 0) ? (key + "_new") : key; + ASSERT_EQ(Get(key), value); + } + ASSERT_OK(Flush()); + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + } + + fprintf(stderr, "Verified %d values\n", num_files * keys_per_file); + } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | + kSkipFIFOCompaction)); +} + INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam, ::testing::Values(1, 4)); diff --git a/db/table_properties_collector.h b/db/table_properties_collector.h index 4bd01fd16..51c2ba915 100644 --- a/db/table_properties_collector.h +++ b/db/table_properties_collector.h @@ -36,7 +36,7 @@ class IntTblPropCollector { virtual bool NeedCompact() const { return false; } }; -// Facrtory for internal table properties collector. +// Factory for internal table properties collector. class IntTblPropCollectorFactory { public: virtual ~IntTblPropCollectorFactory() {} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c73720951..7038a781a 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -42,6 +42,7 @@ struct FlushOptions; struct CompactionOptions; struct CompactRangeOptions; struct TableProperties; +struct ExternalSstFileInfo; class WriteBatch; class Env; class EventListener; @@ -655,6 +656,36 @@ class DB { ColumnFamilyMetaData* metadata) { GetColumnFamilyMetaData(DefaultColumnFamily(), metadata); } + + // Load table file located at "file_path" into "column_family", a pointer to + // ExternalSstFileInfo can be used instead of "file_path" to do a blind add + // that wont need to read the file, move_file can be set to true to + // move the file instead of copying it. + // + // Current Requirements: + // (1) Memtable is empty. + // (2) All existing files (if any) have sequence number = 0. + // (3) Key range in loaded table file don't overlap with existing + // files key ranges. + // (4) No other writes happen during AddFile call, otherwise + // DB may get corrupted. + // (5) Database have at least 2 levels. + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, + bool move_file = false) = 0; + virtual Status AddFile(const std::string& file_path, bool move_file = false) { + return AddFile(DefaultColumnFamily(), file_path, move_file); + } + + // Load table file with information "file_info" into "column_family" + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, + bool move_file = false) = 0; + virtual Status AddFile(const ExternalSstFileInfo* file_info, + bool move_file = false) { + return AddFile(DefaultColumnFamily(), file_info, move_file); + } + #endif // ROCKSDB_LITE // Sets the globally unique ID created at database creation time by invoking diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h new file mode 100644 index 000000000..eb2f89491 --- /dev/null +++ b/include/rocksdb/sst_file_writer.h @@ -0,0 +1,77 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include +#include "rocksdb/env.h" +#include "rocksdb/immutable_options.h" +#include "rocksdb/types.h" + +namespace rocksdb { + +class Comparator; + +// Table Properties that are specific to tables created by SstFileWriter. +struct ExternalSstFilePropertyNames { + // value of this property is a fixed int32 number. + static const std::string kVersion; +}; + +// ExternalSstFileInfo include information about sst files created +// using SstFileWriter +struct ExternalSstFileInfo { + ExternalSstFileInfo() {} + ExternalSstFileInfo(const std::string& _file_path, + const std::string& _smallest_key, + const std::string& _largest_key, + SequenceNumber _sequence_number, uint64_t _file_size, + int32_t _num_entries, int32_t _version) + : file_path(_file_path), + smallest_key(_smallest_key), + largest_key(_largest_key), + sequence_number(_sequence_number), + file_size(_file_size), + num_entries(_num_entries), + version(_version) {} + + std::string file_path; // external sst file path + std::string smallest_key; // smallest user key in file + std::string largest_key; // largest user key in file + SequenceNumber sequence_number; // sequence number of all keys in file + uint64_t file_size; // file size in bytes + uint64_t num_entries; // number of entries in file + int32_t version; // file version +}; + +// SstFileWriter is used to create sst files that can be added to database later +// All keys in files generated by SstFileWriter will have sequence number = 0 +class SstFileWriter { + public: + SstFileWriter(const EnvOptions& env_options, + const ImmutableCFOptions& ioptions, + const Comparator* user_comparator); + + ~SstFileWriter(); + + // Prepare SstFileWriter to write into file located at "file_path". + Status Open(const std::string& file_path); + + // Add key, value to currently opened file + // REQUIRES: key is after any previously added key according to comparator. + Status Add(const Slice& user_key, const Slice& value); + + // Finalize writing to sst file and close file. + // + // An optional ExternalSstFileInfo pointer can be passed to the function + // which will be populated with information about the created sst file + Status Finish(ExternalSstFileInfo* file_info = nullptr); + + private: + class SstFileWriterPropertiesCollectorFactory; + class SstFileWriterPropertiesCollector; + struct Rep; + Rep* rep_; +}; +} // namespace rocksdb diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index d642319f1..e52b58099 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -336,11 +336,13 @@ class TableFactory { // in parameter file. It's the caller's responsibility to make sure // file is in the correct format. // - // NewTableReader() is called in two places: + // NewTableReader() is called in three places: // (1) TableCache::FindTable() calls the function when table cache miss // and cache the table object returned. - // (1) SstFileReader (for SST Dump) opens the table and dump the table + // (2) SstFileReader (for SST Dump) opens the table and dump the table // contents using the interator of the table. + // (3) DBImpl::AddFile() calls this function to read the contents of + // the sst file it's attempting to add // // table_reader_options is a TableReaderOptions which contain all the // needed parameters and configuration to open the table. diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 2077f8f87..95eddc62d 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -63,6 +63,18 @@ class StackableDB : public DB { return db_->MultiGet(options, column_family, keys, values); } + using DB::AddFile; + virtual Status AddFile(ColumnFamilyHandle* column_family, + const ExternalSstFileInfo* file_info, + bool move_file) override { + return db_->AddFile(column_family, file_info, move_file); + } + virtual Status AddFile(ColumnFamilyHandle* column_family, + const std::string& file_path, + bool move_file) override { + return db_->AddFile(column_family, file_path, move_file); + } + using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/src.mk b/src.mk index 22b8ff9fa..1fc4c139d 100644 --- a/src.mk +++ b/src.mk @@ -68,6 +68,7 @@ LIB_SOURCES = \ table/iterator.cc \ table/merger.cc \ table/meta_blocks.cc \ + table/sst_file_writer.cc \ table/plain_table_builder.cc \ table/plain_table_factory.cc \ table/plain_table_index.cc \ diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc new file mode 100644 index 000000000..d780f0a4b --- /dev/null +++ b/table/sst_file_writer.cc @@ -0,0 +1,188 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "rocksdb/sst_file_writer.h" + +#include +#include "db/dbformat.h" +#include "rocksdb/table.h" +#include "table/block_based_table_builder.h" +#include "util/file_reader_writer.h" +#include "util/string_util.h" + +namespace rocksdb { + +const std::string ExternalSstFilePropertyNames::kVersion = + "rocksdb.external_sst_file.version"; + +// PropertiesCollector used to add properties specific to tables +// generated by SstFileWriter +class SstFileWriter::SstFileWriterPropertiesCollector + : public IntTblPropCollector { + public: + explicit SstFileWriterPropertiesCollector(int32_t version) + : version_(version) {} + + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) override { + // Intentionally left blank. Have no interest in collecting stats for + // individual key/value pairs. + return Status::OK(); + } + + virtual Status Finish(UserCollectedProperties* properties) override { + std::string version_val; + PutFixed32(&version_val, static_cast(version_)); + properties->insert({ExternalSstFilePropertyNames::kVersion, version_val}); + return Status::OK(); + } + + virtual const char* Name() const override { + return "SstFileWriterPropertiesCollector"; + } + + virtual UserCollectedProperties GetReadableProperties() const override { + return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}}; + } + + private: + int32_t version_; +}; + +class SstFileWriter::SstFileWriterPropertiesCollectorFactory + : public IntTblPropCollectorFactory { + public: + explicit SstFileWriterPropertiesCollectorFactory(int32_t version) + : version_(version) {} + + virtual IntTblPropCollector* CreateIntTblPropCollector() override { + return new SstFileWriterPropertiesCollector(version_); + } + + virtual const char* Name() const override { + return "SstFileWriterPropertiesCollector"; + } + + private: + int32_t version_; +}; + +struct SstFileWriter::Rep { + Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions, + const Comparator* _user_comparator) + : env_options(_env_options), + ioptions(_ioptions), + internal_comparator(_user_comparator) {} + + std::unique_ptr file_writer; + std::unique_ptr builder; + EnvOptions env_options; + ImmutableCFOptions ioptions; + InternalKeyComparator internal_comparator; + ExternalSstFileInfo file_info; +}; + +SstFileWriter::SstFileWriter(const EnvOptions& env_options, + const ImmutableCFOptions& ioptions, + const Comparator* user_comparator) + : rep_(new Rep(env_options, ioptions, user_comparator)) {} + +SstFileWriter::~SstFileWriter() { delete rep_; } + +Status SstFileWriter::Open(const std::string& file_path) { + Rep* r = rep_; + Status s; + std::unique_ptr sst_file; + s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options); + if (!s.ok()) { + return s; + } + + CompressionType compression_type = r->ioptions.compression; + if (!r->ioptions.compression_per_level.empty()) { + // Use the compression of the last level if we have per level compression + compression_type = *(r->ioptions.compression_per_level.rbegin()); + } + + std::vector> + int_tbl_prop_collector_factories; + int_tbl_prop_collector_factories.emplace_back( + new SstFileWriterPropertiesCollectorFactory(1 /* version */)); + + TableBuilderOptions table_builder_options( + r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, + compression_type, r->ioptions.compression_opts, false); + r->file_writer.reset( + new WritableFileWriter(std::move(sst_file), r->env_options)); + r->builder.reset(r->ioptions.table_factory->NewTableBuilder( + table_builder_options, r->file_writer.get())); + + r->file_info.file_path = file_path; + r->file_info.file_size = 0; + r->file_info.num_entries = 0; + r->file_info.sequence_number = 0; + r->file_info.version = 1; + return s; +} + +Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { + Rep* r = rep_; + if (!r->builder) { + return Status::InvalidArgument("File is not opened"); + } + + if (r->file_info.num_entries == 0) { + r->file_info.smallest_key = user_key.ToString(); + } else { + if (r->internal_comparator.user_comparator()->Compare( + user_key, r->file_info.largest_key) <= 0) { + // Make sure that keys are added in order + return Status::InvalidArgument("Keys must be added in order"); + } + } + + // update file info + r->file_info.num_entries++; + r->file_info.largest_key = user_key.ToString(); + r->file_info.file_size = r->builder->FileSize(); + + InternalKey ikey(user_key, 0 /* Sequence Number */, + ValueType::kTypeValue /* Put */); + r->builder->Add(ikey.Encode(), value); + + return Status::OK(); +} + +Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { + Rep* r = rep_; + if (!r->builder) { + return Status::InvalidArgument("File is not opened"); + } + + Status s = r->builder->Finish(); + if (s.ok()) { + if (!r->ioptions.disable_data_sync) { + s = r->file_writer->Sync(r->ioptions.use_fsync); + } + if (s.ok()) { + s = r->file_writer->Close(); + } + } else { + r->builder->Abandon(); + } + + if (!s.ok()) { + r->ioptions.env->DeleteFile(r->file_info.file_path); + } + + if (s.ok() && file_info != nullptr) { + r->file_info.file_size = r->builder->FileSize(); + *file_info = r->file_info; + } + + r->builder.reset(); + return s; +} +} // namespace rocksdb