From f7bb1a0060cfaeefefac9a32734956653ea7e35c Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Fri, 26 May 2017 12:05:19 -0700 Subject: [PATCH] support merge and delete in file ingestion Summary: Previously sst_file_writer only supports kTypeValue, we need kTypeMerge and kTypeDeletion also as user requested. Closes https://github.com/facebook/rocksdb/pull/2361 Differential Revision: D5139402 Pulled By: lightmark fbshipit-source-id: 092a60756d01692539d817a3765ebfd58a8d7f88 --- db/c.cc | 21 +- db/c_test.c | 12 +- db/db_merge_operator_test.cc | 22 -- db/db_test_util.h | 22 ++ db/external_sst_file_basic_test.cc | 318 ++++++++++++++++++++++++---- db/external_sst_file_test.cc | 74 +++---- include/rocksdb/c.h | 9 + include/rocksdb/sst_file_writer.h | 23 +- java/rocksjni/sst_file_writerjni.cc | 53 ++++- table/block.cc | 12 +- table/sst_file_writer.cc | 116 ++++++---- 11 files changed, 527 insertions(+), 155 deletions(-) diff --git a/db/c.cc b/db/c.cc index 3b52d2d70..cc359b2c1 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2852,7 +2852,26 @@ void rocksdb_sstfilewriter_open(rocksdb_sstfilewriter_t* writer, void rocksdb_sstfilewriter_add(rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen, const char* val, size_t vallen, char** errptr) { - SaveError(errptr, writer->rep->Add(Slice(key, keylen), Slice(val, vallen))); + SaveError(errptr, writer->rep->Put(Slice(key, keylen), Slice(val, vallen))); +} + +void rocksdb_sstfilewriter_put(rocksdb_sstfilewriter_t* writer, const char* key, + size_t keylen, const char* val, size_t vallen, + char** errptr) { + SaveError(errptr, writer->rep->Put(Slice(key, keylen), Slice(val, vallen))); +} + +void rocksdb_sstfilewriter_merge(rocksdb_sstfilewriter_t* writer, + const char* key, size_t keylen, + const char* val, size_t vallen, + char** errptr) { + SaveError(errptr, writer->rep->Merge(Slice(key, keylen), Slice(val, vallen))); +} + +void rocksdb_sstfilewriter_delete(rocksdb_sstfilewriter_t* writer, + const char* key, size_t keylen, + char** errptr) { + SaveError(errptr, writer->rep->Delete(Slice(key, keylen))); } void rocksdb_sstfilewriter_finish(rocksdb_sstfilewriter_t* writer, diff --git a/db/c_test.c b/db/c_test.c index e8b44a726..57f19aa96 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -569,11 +569,11 @@ int main(int argc, char** argv) { unlink(sstfilename); rocksdb_sstfilewriter_open(writer, sstfilename, &err); CheckNoError(err); - rocksdb_sstfilewriter_add(writer, "sstk1", 5, "v1", 2, &err); + rocksdb_sstfilewriter_put(writer, "sstk1", 5, "v1", 2, &err); CheckNoError(err); - rocksdb_sstfilewriter_add(writer, "sstk2", 5, "v2", 2, &err); + rocksdb_sstfilewriter_put(writer, "sstk2", 5, "v2", 2, &err); CheckNoError(err); - rocksdb_sstfilewriter_add(writer, "sstk3", 5, "v3", 2, &err); + rocksdb_sstfilewriter_put(writer, "sstk3", 5, "v3", 2, &err); CheckNoError(err); rocksdb_sstfilewriter_finish(writer, &err); CheckNoError(err); @@ -590,11 +590,11 @@ int main(int argc, char** argv) { unlink(sstfilename); rocksdb_sstfilewriter_open(writer, sstfilename, &err); CheckNoError(err); - rocksdb_sstfilewriter_add(writer, "sstk2", 5, "v4", 2, &err); + rocksdb_sstfilewriter_put(writer, "sstk2", 5, "v4", 2, &err); CheckNoError(err); - rocksdb_sstfilewriter_add(writer, "sstk22", 6, "v5", 2, &err); + rocksdb_sstfilewriter_put(writer, "sstk22", 6, "v5", 2, &err); CheckNoError(err); - rocksdb_sstfilewriter_add(writer, "sstk3", 5, "v6", 2, &err); + rocksdb_sstfilewriter_put(writer, "sstk3", 5, "v6", 2, &err); CheckNoError(err); rocksdb_sstfilewriter_finish(writer, &err); CheckNoError(err); diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index b1cce417a..7c842ff84 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -20,28 +20,6 @@ class DBMergeOperatorTest : public DBTestBase { DBMergeOperatorTest() : DBTestBase("/db_merge_operator_test") {} }; -// A test merge operator mimics put but also fails if one of merge operands is -// "corrupted". -class TestPutOperator : public MergeOperator { - public: - virtual bool FullMergeV2(const MergeOperationInput& merge_in, - MergeOperationOutput* merge_out) const override { - if (merge_in.existing_value != nullptr && - *(merge_in.existing_value) == "corrupted") { - return false; - } - for (auto value : merge_in.operand_list) { - if (value == "corrupted") { - return false; - } - } - merge_out->existing_operand = merge_in.operand_list.back(); - return true; - } - - virtual const char* Name() const override { return "TestPutOperator"; } -}; - TEST_F(DBMergeOperatorTest, MergeErrorOnRead) { Options options; options.create_if_missing = true; diff --git a/db/db_test_util.h b/db/db_test_util.h index 699f87106..f729f50c9 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -586,6 +586,28 @@ class OnFileDeletionListener : public EventListener { }; #endif +// A test merge operator mimics put but also fails if one of merge operands is +// "corrupted". +class TestPutOperator : public MergeOperator { + public: + virtual bool FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const override { + if (merge_in.existing_value != nullptr && + *(merge_in.existing_value) == "corrupted") { + return false; + } + for (auto value : merge_in.operand_list) { + if (value == "corrupted") { + return false; + } + } + merge_out->existing_operand = merge_in.operand_list.back(); + return true; + } + + virtual const char* Name() const override { return "TestPutOperator"; } +}; + class DBTestBase : public testing::Test { protected: // Sequence of option configurations to try diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 9c0d3caed..651507153 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -40,8 +40,10 @@ class ExternalSSTFileBasicTest : public DBTestBase { } Status GenerateAndAddExternalFile( - const Options options, std::vector keys, int file_id, + const Options options, std::vector keys, + const std::vector& value_types, int file_id, std::map* true_data) { + assert(value_types.size() == 1 || keys.size() == value_types.size()); std::string file_path = sst_files_dir_ + ToString(file_id); SstFileWriter sst_file_writer(EnvOptions(), options); @@ -49,11 +51,28 @@ class ExternalSSTFileBasicTest : public DBTestBase { if (!s.ok()) { return s; } - for (int k : keys) { - std::string key = Key(k); - std::string value = Key(k) + ToString(file_id); - s = sst_file_writer.Add(key, value); - (*true_data)[key] = value; + for (size_t i = 0; i < keys.size(); i++) { + std::string key = Key(keys[i]); + std::string value = Key(keys[i]) + ToString(file_id); + ValueType value_type = + (value_types.size() == 1 ? value_types[0] : value_types[i]); + switch (value_type) { + case ValueType::kTypeValue: + s = sst_file_writer.Put(key, value); + (*true_data)[key] = value; + break; + case ValueType::kTypeMerge: + s = sst_file_writer.Merge(key, value); + // we only use TestPutOperator in this test + (*true_data)[key] = value; + break; + case ValueType::kTypeDeletion: + s = sst_file_writer.Delete(key); + true_data->erase(key); + break; + default: + return Status::InvalidArgument("Value type is not supported"); + } if (!s.ok()) { sst_file_writer.Finish(); return s; @@ -66,10 +85,17 @@ class ExternalSSTFileBasicTest : public DBTestBase { ifo.allow_global_seqno = true; s = db_->IngestExternalFile({file_path}, ifo); } - return s; } + Status GenerateAndAddExternalFile( + const Options options, std::vector keys, const ValueType value_type, + int file_id, std::map* true_data) { + return GenerateAndAddExternalFile(options, keys, + std::vector(1, value_type), + file_id, true_data); + } + ~ExternalSSTFileBasicTest() { test::DestroyDir(env_, sst_files_dir_); } protected: @@ -89,7 +115,7 @@ TEST_F(ExternalSSTFileBasicTest, Basic) { std::string file1 = sst_files_dir_ + "file1.sst"; ASSERT_OK(sst_file_writer.Open(file1)); for (int k = 0; k < 100; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file1_info; Status s = sst_file_writer.Finish(&file1_info); @@ -103,7 +129,7 @@ TEST_F(ExternalSSTFileBasicTest, Basic) { ASSERT_EQ(file1_info.smallest_key, Key(0)); ASSERT_EQ(file1_info.largest_key, Key(99)); // sst_file_writer already finished, cannot add this value - s = sst_file_writer.Add(Key(100), "bad_val"); + s = sst_file_writer.Put(Key(100), "bad_val"); ASSERT_FALSE(s.ok()) << s.ToString(); DestroyAndReopen(options); @@ -128,7 +154,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) { std::string file1 = sst_files_dir_ + "file1.sst"; ASSERT_OK(sst_file_writer.Open(file1)); for (int k = 0; k < 100; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file1_info; Status s = sst_file_writer.Finish(&file1_info); @@ -142,7 +168,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) { std::string file2 = sst_files_dir_ + "file2.sst"; ASSERT_OK(sst_file_writer.Open(file2)); for (int k = 100; k < 300; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file2_info; s = sst_file_writer.Finish(&file2_info); @@ -156,7 +182,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) { std::string file3 = sst_files_dir_ + "file3.sst"; ASSERT_OK(sst_file_writer.Open(file3)); for (int k = 110; k < 125; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); } ExternalSstFileInfo file3_info; s = sst_file_writer.Finish(&file3_info); @@ -191,33 +217,36 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) { int file_id = 1; - ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, file_id++, + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, + ValueType::kTypeValue, file_id++, &true_data)); // File dont overwrite any keys, No seqno needed ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); - ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, file_id++, + ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, + ValueType::kTypeValue, file_id++, + &true_data)); // File dont overwrite any keys, No seqno needed ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); - ASSERT_OK( - GenerateAndAddExternalFile(options, {1, 4, 6}, file_id++, &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 4, 6}, ValueType::kTypeValue, file_id++, &true_data)); // File overwrite some keys, a seqno will be assigned ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1); - ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++, - &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {11, 15, 19}, ValueType::kTypeValue, file_id++, &true_data)); // File overwrite some keys, a seqno will be assigned ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); - ASSERT_OK( - GenerateAndAddExternalFile(options, {120, 130}, file_id++, &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {120, 130}, ValueType::kTypeValue, file_id++, &true_data)); // File dont overwrite any keys, No seqno needed ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); - ASSERT_OK( - GenerateAndAddExternalFile(options, {1, 130}, file_id++, &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 130}, ValueType::kTypeValue, file_id++, &true_data)); // File overwrite some keys, a seqno will be assigned ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3); @@ -228,17 +257,115 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) { } SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); - ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++, - &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {60, 61, 62}, ValueType::kTypeValue, file_id++, &true_data)); // File dont overwrite any keys, No seqno needed ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); - ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++, + ASSERT_OK(GenerateAndAddExternalFile( + options, {40, 41, 42}, ValueType::kTypeValue, file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {20, 30, 40}, ValueType::kTypeValue, file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2); + + const Snapshot* snapshot = db_->GetSnapshot(); + + // We will need a seqno for the file regardless if the file overwrite + // keys in the DB or not because we have a snapshot + ASSERT_OK(GenerateAndAddExternalFile( + options, {1000, 1002}, ValueType::kTypeValue, file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {2000, 3002}, ValueType::kTypeValue, file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4); + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150}, + ValueType::kTypeValue, file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + db_->ReleaseSnapshot(snapshot); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {5000, 5001}, ValueType::kTypeValue, file_id++, &true_data)); + // No snapshot anymore, no need to assign a seqno + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); + } while (ChangeCompactOptions()); +} + +TEST_F(ExternalSSTFileBasicTest, IngestFileWithMultipleValueType) { + do { + Options options = CurrentOptions(); + options.merge_operator.reset(new TestPutOperator()); + DestroyAndReopen(options); + std::map true_data; + + int file_id = 1; + + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2, 3, 4, 5, 6}, + ValueType::kTypeValue, file_id++, + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile(options, {10, 11, 12, 13}, + ValueType::kTypeValue, file_id++, + + &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 4, 6}, ValueType::kTypeMerge, file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1); + + ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, + ValueType::kTypeDeletion, file_id++, + &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {120, 130}, ValueType::kTypeMerge, file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 130}, ValueType::kTypeDeletion, file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3); + + // Write some keys through normal write path + for (int i = 0; i < 50; i++) { + ASSERT_OK(Put(Key(i), "memtable")); + true_data[Key(i)] = "memtable"; + } + SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {60, 61, 62}, ValueType::kTypeValue, file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {40, 41, 42}, ValueType::kTypeMerge, file_id++, &true_data)); // File overwrite some keys, a seqno will be assigned ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1); - ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++, + ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, + ValueType::kTypeDeletion, file_id++, &true_data)); // File overwrite some keys, a seqno will be assigned ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2); @@ -247,25 +374,143 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) { // We will need a seqno for the file regardless if the file overwrite // keys in the DB or not because we have a snapshot - ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, file_id++, - &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {1000, 1002}, ValueType::kTypeMerge, file_id++, &true_data)); // A global seqno will be assigned anyway because of the snapshot ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3); - ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, file_id++, - &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {2000, 3002}, ValueType::kTypeMerge, file_id++, &true_data)); // A global seqno will be assigned anyway because of the snapshot ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4); ASSERT_OK(GenerateAndAddExternalFile(options, {1, 20, 40, 100, 150}, - file_id++, &true_data)); + ValueType::kTypeMerge, file_id++, + &true_data)); // A global seqno will be assigned anyway because of the snapshot ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); db_->ReleaseSnapshot(snapshot); - ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, file_id++, - &true_data)); + ASSERT_OK(GenerateAndAddExternalFile( + options, {5000, 5001}, ValueType::kTypeValue, file_id++, &true_data)); + // No snapshot anymore, no need to assign a seqno + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + size_t kcnt = 0; + VerifyDBFromMap(true_data, &kcnt, false); + } while (ChangeCompactOptions()); +} + +TEST_F(ExternalSSTFileBasicTest, IngestFileWithMixedValueType) { + do { + Options options = CurrentOptions(); + options.merge_operator.reset(new TestPutOperator()); + DestroyAndReopen(options); + std::map true_data; + + int file_id = 1; + + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 2, 3, 4, 5, 6}, + {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue, + ValueType::kTypeMerge, ValueType::kTypeValue, ValueType::kTypeMerge}, + file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {10, 11, 12, 13}, + {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue, + ValueType::kTypeMerge}, + file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 0); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 4, 6}, {ValueType::kTypeDeletion, ValueType::kTypeValue, + ValueType::kTypeMerge}, + file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {11, 15, 19}, {ValueType::kTypeDeletion, ValueType::kTypeMerge, + ValueType::kTypeValue}, + file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {120, 130}, {ValueType::kTypeValue, ValueType::kTypeMerge}, + file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 130}, {ValueType::kTypeMerge, ValueType::kTypeDeletion}, + file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3); + + // Write some keys through normal write path + for (int i = 0; i < 50; i++) { + ASSERT_OK(Put(Key(i), "memtable")); + true_data[Key(i)] = "memtable"; + } + SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {60, 61, 62}, + {ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeValue}, + file_id++, &true_data)); + // File dont overwrite any keys, No seqno needed + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {40, 41, 42}, {ValueType::kTypeValue, ValueType::kTypeDeletion, + ValueType::kTypeDeletion}, + file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {20, 30, 40}, + {ValueType::kTypeDeletion, ValueType::kTypeDeletion, + ValueType::kTypeDeletion}, + file_id++, &true_data)); + // File overwrite some keys, a seqno will be assigned + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2); + + const Snapshot* snapshot = db_->GetSnapshot(); + + // We will need a seqno for the file regardless if the file overwrite + // keys in the DB or not because we have a snapshot + ASSERT_OK(GenerateAndAddExternalFile( + options, {1000, 1002}, {ValueType::kTypeValue, ValueType::kTypeMerge}, + file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {2000, 3002}, {ValueType::kTypeValue, ValueType::kTypeMerge}, + file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {1, 20, 40, 100, 150}, + {ValueType::kTypeDeletion, ValueType::kTypeDeletion, + ValueType::kTypeValue, ValueType::kTypeMerge, ValueType::kTypeMerge}, + file_id++, &true_data)); + // A global seqno will be assigned anyway because of the snapshot + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); + + db_->ReleaseSnapshot(snapshot); + + ASSERT_OK(GenerateAndAddExternalFile( + options, {5000, 5001}, {ValueType::kTypeValue, ValueType::kTypeMerge}, + file_id++, &true_data)); // No snapshot anymore, no need to assign a seqno ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5); @@ -280,7 +525,7 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { size_t total_fadvised_bytes = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "SstFileWriter::InvalidatePageCache", [&](void* arg) { + "SstFileWriter::Rep::InvalidatePageCache", [&](void* arg) { size_t fadvise_size = *(reinterpret_cast(arg)); total_fadvised_bytes += fadvise_size; }); @@ -293,19 +538,18 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { new SstFileWriter(EnvOptions(), options, nullptr, false)); ASSERT_OK(sst_file_writer->Open(sst_file_path)); for (int i = 0; i < kNumKeys; i++) { - ASSERT_OK(sst_file_writer->Add(Key(i), Key(i))); + ASSERT_OK(sst_file_writer->Put(Key(i), Key(i))); } ASSERT_OK(sst_file_writer->Finish()); // fadvise disabled ASSERT_EQ(total_fadvised_bytes, 0); - sst_file_path = sst_files_dir_ + "file_fadvise_enable.sst"; sst_file_writer.reset( new SstFileWriter(EnvOptions(), options, nullptr, true)); ASSERT_OK(sst_file_writer->Open(sst_file_path)); for (int i = 0; i < kNumKeys; i++) { - ASSERT_OK(sst_file_writer->Add(Key(i), Key(i))); + ASSERT_OK(sst_file_writer->Put(Key(i), Key(i))); } ASSERT_OK(sst_file_writer->Finish()); // fadvise enabled diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 74d969473..be31d4874 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -63,7 +63,7 @@ class ExternalSSTFileTest : public DBTestBase { return s; } for (auto& entry : data) { - s = sst_file_writer.Add(entry.first, entry.second); + s = sst_file_writer.Put(entry.first, entry.second); if (!s.ok()) { sst_file_writer.Finish(); return s; @@ -125,7 +125,7 @@ class ExternalSSTFileTest : public DBTestBase { return s; } for (auto& entry : data) { - s = sst_file_writer.Add(entry.first, entry.second); + s = sst_file_writer.Put(entry.first, entry.second); if (!s.ok()) { sst_file_writer.Finish(); return s; @@ -211,7 +211,7 @@ TEST_F(ExternalSSTFileTest, Basic) { std::string file1 = sst_files_dir_ + "file1.sst"; ASSERT_OK(sst_file_writer.Open(file1)); for (int k = 0; k < 100; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file1_info; Status s = sst_file_writer.Finish(&file1_info); @@ -225,17 +225,17 @@ TEST_F(ExternalSSTFileTest, Basic) { ASSERT_EQ(file1_info.smallest_key, Key(0)); ASSERT_EQ(file1_info.largest_key, Key(99)); // sst_file_writer already finished, cannot add this value - s = sst_file_writer.Add(Key(100), "bad_val"); + s = sst_file_writer.Put(Key(100), "bad_val"); ASSERT_FALSE(s.ok()) << s.ToString(); // file2.sst (100 => 199) std::string file2 = sst_files_dir_ + "file2.sst"; ASSERT_OK(sst_file_writer.Open(file2)); for (int k = 100; k < 200; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } // Cannot add this key because it's not after last added key - s = sst_file_writer.Add(Key(99), "bad_val"); + s = sst_file_writer.Put(Key(99), "bad_val"); ASSERT_FALSE(s.ok()) << s.ToString(); ExternalSstFileInfo file2_info; s = sst_file_writer.Finish(&file2_info); @@ -250,7 +250,7 @@ TEST_F(ExternalSSTFileTest, Basic) { std::string file3 = sst_files_dir_ + "file3.sst"; ASSERT_OK(sst_file_writer.Open(file3)); for (int k = 195; k < 300; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); } ExternalSstFileInfo file3_info; s = sst_file_writer.Finish(&file3_info); @@ -268,7 +268,7 @@ TEST_F(ExternalSSTFileTest, Basic) { std::string file4 = sst_files_dir_ + "file4.sst"; ASSERT_OK(sst_file_writer.Open(file4)); for (int k = 30; k < 40; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); } ExternalSstFileInfo file4_info; s = sst_file_writer.Finish(&file4_info); @@ -282,7 +282,7 @@ TEST_F(ExternalSSTFileTest, Basic) { std::string file5 = sst_files_dir_ + "file5.sst"; ASSERT_OK(sst_file_writer.Open(file5)); for (int k = 400; k < 500; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file5_info; s = sst_file_writer.Finish(&file5_info); @@ -444,7 +444,7 @@ TEST_F(ExternalSSTFileTest, AddList) { std::string file1 = sst_files_dir_ + "file1.sst"; ASSERT_OK(sst_file_writer.Open(file1)); for (int k = 0; k < 100; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file1_info; Status s = sst_file_writer.Finish(&file1_info); @@ -454,17 +454,17 @@ TEST_F(ExternalSSTFileTest, AddList) { ASSERT_EQ(file1_info.smallest_key, Key(0)); ASSERT_EQ(file1_info.largest_key, Key(99)); // sst_file_writer already finished, cannot add this value - s = sst_file_writer.Add(Key(100), "bad_val"); + s = sst_file_writer.Put(Key(100), "bad_val"); ASSERT_FALSE(s.ok()) << s.ToString(); // file2.sst (100 => 199) std::string file2 = sst_files_dir_ + "file2.sst"; ASSERT_OK(sst_file_writer.Open(file2)); for (int k = 100; k < 200; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } // Cannot add this key because it's not after last added key - s = sst_file_writer.Add(Key(99), "bad_val"); + s = sst_file_writer.Put(Key(99), "bad_val"); ASSERT_FALSE(s.ok()) << s.ToString(); ExternalSstFileInfo file2_info; s = sst_file_writer.Finish(&file2_info); @@ -479,7 +479,7 @@ TEST_F(ExternalSSTFileTest, AddList) { std::string file3 = sst_files_dir_ + "file3.sst"; ASSERT_OK(sst_file_writer.Open(file3)); for (int k = 195; k < 200; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); } ExternalSstFileInfo file3_info; s = sst_file_writer.Finish(&file3_info); @@ -494,7 +494,7 @@ TEST_F(ExternalSSTFileTest, AddList) { std::string file4 = sst_files_dir_ + "file4.sst"; ASSERT_OK(sst_file_writer.Open(file4)); for (int k = 30; k < 40; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); } ExternalSstFileInfo file4_info; s = sst_file_writer.Finish(&file4_info); @@ -508,7 +508,7 @@ TEST_F(ExternalSSTFileTest, AddList) { std::string file5 = sst_files_dir_ + "file5.sst"; ASSERT_OK(sst_file_writer.Open(file5)); for (int k = 200; k < 300; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file5_info; s = sst_file_writer.Finish(&file5_info); @@ -639,7 +639,7 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) { files[i] = sst_files_dir_ + "file" + std::to_string(i) + ".sst"; ASSERT_OK(sst_file_writer.Open(files[i])); for (int k = i * 100; k < (i + 1) * 100; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } Status s = sst_file_writer.Finish(&files_info[i]); ASSERT_TRUE(s.ok()) << s.ToString(); @@ -676,7 +676,7 @@ TEST_F(ExternalSSTFileTest, PurgeObsoleteFilesBug) { ASSERT_OK(s); for (int i = 0; i < 500; i++) { std::string k = Key(i); - s = sst_file_writer.Add(k, k + "_val"); + s = sst_file_writer.Put(k, k + "_val"); ASSERT_OK(s); } @@ -719,7 +719,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) { std::string file1 = sst_files_dir_ + "file1.sst"; ASSERT_OK(sst_file_writer.Open(file1)); for (int k = 0; k < 100; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file1_info; Status s = sst_file_writer.Finish(&file1_info); @@ -733,7 +733,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) { std::string file2 = sst_files_dir_ + "file2.sst"; ASSERT_OK(sst_file_writer.Open(file2)); for (int k = 100; k < 300; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file2_info; s = sst_file_writer.Finish(&file2_info); @@ -763,7 +763,7 @@ TEST_F(ExternalSSTFileTest, SkipSnapshot) { std::string file3 = sst_files_dir_ + "file3.sst"; ASSERT_OK(sst_file_writer.Open(file3)); for (int k = 300; k < 400; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val")); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); } ExternalSstFileInfo file3_info; s = sst_file_writer.Finish(&file3_info); @@ -809,7 +809,7 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) { ASSERT_OK(sst_file_writer.Open(file_names[file_idx])); for (int k = range_start; k < range_end; k++) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k))); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k))); } Status s = sst_file_writer.Finish(); @@ -960,7 +960,7 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) { std::string file_name = sst_files_dir_ + env_->GenerateUniqueId(); ASSERT_OK(sst_file_writer.Open(file_name)); for (int k = range_start; k <= range_end; k++) { - s = sst_file_writer.Add(Key(k), range_val); + s = sst_file_writer.Put(Key(k), range_val); ASSERT_OK(s); } ExternalSstFileInfo file_info; @@ -1358,7 +1358,7 @@ TEST_F(ExternalSSTFileTest, AddExternalSstFileWithCustomCompartor) { int range_end = i * 10; int range_start = range_end + 15; for (int k = (range_start - 1); k >= range_end; k--) { - ASSERT_OK(sst_file_writer.Add(Key(k), Key(k))); + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k))); } ExternalSstFileInfo file_info; ASSERT_OK(sst_file_writer.Finish(&file_info)); @@ -1448,12 +1448,12 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { std::string suffix(100, 'X'); ASSERT_OK(sst_file_writer.Open(file_path)); - ASSERT_OK(sst_file_writer.Add("A" + suffix, "VAL")); - ASSERT_OK(sst_file_writer.Add("BB" + suffix, "VAL")); - ASSERT_OK(sst_file_writer.Add("CC" + suffix, "VAL")); - ASSERT_OK(sst_file_writer.Add("CXD" + suffix, "VAL")); - ASSERT_OK(sst_file_writer.Add("CZZZ" + suffix, "VAL")); - ASSERT_OK(sst_file_writer.Add("ZAAAX" + suffix, "VAL")); + ASSERT_OK(sst_file_writer.Put("A" + suffix, "VAL")); + ASSERT_OK(sst_file_writer.Put("BB" + suffix, "VAL")); + ASSERT_OK(sst_file_writer.Put("CC" + suffix, "VAL")); + ASSERT_OK(sst_file_writer.Put("CXD" + suffix, "VAL")); + ASSERT_OK(sst_file_writer.Put("CZZZ" + suffix, "VAL")); + ASSERT_OK(sst_file_writer.Put("ZAAAX" + suffix, "VAL")); ASSERT_OK(sst_file_writer.Finish()); ASSERT_OK(DeprecatedAddFile({file_path})); @@ -1748,22 +1748,22 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) { // default_cf.sst const std::string cf_default_sst = sst_files_dir_ + "/default_cf.sst"; ASSERT_OK(sfw_default.Open(cf_default_sst)); - ASSERT_OK(sfw_default.Add("K1", "V1")); - ASSERT_OK(sfw_default.Add("K2", "V2")); + ASSERT_OK(sfw_default.Put("K1", "V1")); + ASSERT_OK(sfw_default.Put("K2", "V2")); ASSERT_OK(sfw_default.Finish()); // cf1.sst const std::string cf1_sst = sst_files_dir_ + "/cf1.sst"; ASSERT_OK(sfw_cf1.Open(cf1_sst)); - ASSERT_OK(sfw_cf1.Add("K3", "V1")); - ASSERT_OK(sfw_cf1.Add("K4", "V2")); + ASSERT_OK(sfw_cf1.Put("K3", "V1")); + ASSERT_OK(sfw_cf1.Put("K4", "V2")); ASSERT_OK(sfw_cf1.Finish()); // cf_unknown.sst const std::string unknown_sst = sst_files_dir_ + "/cf_unknown.sst"; ASSERT_OK(sfw_unknown.Open(unknown_sst)); - ASSERT_OK(sfw_unknown.Add("K5", "V1")); - ASSERT_OK(sfw_unknown.Add("K6", "V2")); + ASSERT_OK(sfw_unknown.Put("K5", "V1")); + ASSERT_OK(sfw_unknown.Put("K6", "V2")); ASSERT_OK(sfw_unknown.Finish()); IngestExternalFileOptions ifo; @@ -1864,7 +1864,7 @@ TEST_F(ExternalSSTFileTest, SnapshotInconsistencyBug) { SstFileWriter sst_file_writer(EnvOptions(), options); ASSERT_OK(sst_file_writer.Open(sst_file_path)); for (int i = 0; i < kNumKeys; i++) { - ASSERT_OK(sst_file_writer.Add(Key(i), Key(i) + "_V2")); + ASSERT_OK(sst_file_writer.Put(Key(i), Key(i) + "_V2")); } ASSERT_OK(sst_file_writer.Finish()); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 24fec04a5..c768210e3 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -1117,6 +1117,15 @@ extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_open( extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_add( rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen, const char* val, size_t vallen, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_put( + rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen, + const char* val, size_t vallen, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_merge( + rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen, + const char* val, size_t vallen, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_delete( + rocksdb_sstfilewriter_t* writer, const char* key, size_t keylen, + char** errptr); extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_finish( rocksdb_sstfilewriter_t* writer, char** errptr); extern ROCKSDB_LIBRARY_API void rocksdb_sstfilewriter_destroy( diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h index 066c6f316..e7683c25a 100644 --- a/include/rocksdb/sst_file_writer.h +++ b/include/rocksdb/sst_file_writer.h @@ -17,6 +17,12 @@ #include "rocksdb/table_properties.h" #include "rocksdb/types.h" +#if defined(__GNUC__) || defined(__clang__) +#define ROCKSDB_DEPRECATED_FUNC __attribute__((__deprecated__)) +#elif _WIN32 +#define ROCKSDB_DEPRECATED_FUNC __declspec(deprecated) +#endif + namespace rocksdb { class Comparator; @@ -76,9 +82,21 @@ class SstFileWriter { // Prepare SstFileWriter to write into file located at "file_path". Status Open(const std::string& file_path); - // Add key, value to currently opened file + // Add a Put key with value to currently opened file (deprecated) + // REQUIRES: key is after any previously added key according to comparator. + ROCKSDB_DEPRECATED_FUNC Status Add(const Slice& user_key, const Slice& value); + + // Add a Put key with value to currently opened file + // REQUIRES: key is after any previously added key according to comparator. + Status Put(const Slice& user_key, const Slice& value); + + // Add a Merge key with value to currently opened file + // REQUIRES: key is after any previously added key according to comparator. + Status Merge(const Slice& user_key, const Slice& value); + + // Add a deletion key to currently opened file // REQUIRES: key is after any previously added key according to comparator. - Status Add(const Slice& user_key, const Slice& value); + Status Delete(const Slice& user_key); // Finalize writing to sst file and close file. // @@ -91,7 +109,6 @@ class SstFileWriter { private: void InvalidatePageCache(bool closing); - struct Rep; std::unique_ptr rep_; }; diff --git a/java/rocksjni/sst_file_writerjni.cc b/java/rocksjni/sst_file_writerjni.cc index 97f3351c7..84bce5c0f 100644 --- a/java/rocksjni/sst_file_writerjni.cc +++ b/java/rocksjni/sst_file_writerjni.cc @@ -83,13 +83,64 @@ void Java_org_rocksdb_SstFileWriter_add(JNIEnv *env, jobject jobj, auto *key_slice = reinterpret_cast(jkey_handle); auto *value_slice = reinterpret_cast(jvalue_handle); rocksdb::Status s = - reinterpret_cast(jhandle)->Add(*key_slice, + reinterpret_cast(jhandle)->Put(*key_slice, *value_slice); if (!s.ok()) { rocksdb::RocksDBExceptionJni::ThrowNew(env, s); } } +/* + * Class: org_rocksdb_SstFileWriter + * Method: put + * Signature: (JJJ)V + */ +void Java_org_rocksdb_SstFileWriter_put(JNIEnv *env, jobject jobj, + jlong jhandle, jlong jkey_handle, + jlong jvalue_handle) { + auto *key_slice = reinterpret_cast(jkey_handle); + auto *value_slice = reinterpret_cast(jvalue_handle); + rocksdb::Status s = + reinterpret_cast(jhandle)->Put(*key_slice, + *value_slice); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_SstFileWriter + * Method: merge + * Signature: (JJJ)V + */ +void Java_org_rocksdb_SstFileWriter_merge(JNIEnv *env, jobject jobj, + jlong jhandle, jlong jkey_handle, + jlong jvalue_handle) { + auto *key_slice = reinterpret_cast(jkey_handle); + auto *value_slice = reinterpret_cast(jvalue_handle); + rocksdb::Status s = + reinterpret_cast(jhandle)->Merge(*key_slice, + *value_slice); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + +/* + * Class: org_rocksdb_SstFileWriter + * Method: delete + * Signature: (JJJ)V + */ +void Java_org_rocksdb_SstFileWriter_delete(JNIEnv *env, jobject jobj, + jlong jhandle, jlong jkey_handle) { + auto *key_slice = reinterpret_cast(jkey_handle); + rocksdb::Status s = + reinterpret_cast(jhandle)->Delete(*key_slice); + if (!s.ok()) { + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + } +} + /* * Class: org_rocksdb_SstFileWriter * Method: finish diff --git a/table/block.cc b/table/block.cc index 693451e5e..2d40c3fce 100644 --- a/table/block.cc +++ b/table/block.cc @@ -246,10 +246,14 @@ bool BlockIter::ParseNextKey() { if (global_seqno_ != kDisableGlobalSequenceNumber) { // If we are reading a file with a global sequence number we should - // expect that all encoded sequence numbers are zeros and all value - // types are kTypeValue + // expect that all encoded sequence numbers are zeros and any value + // type is kTypeValue, kTypeMerge or kTypeDeletion assert(GetInternalKeySeqno(key_.GetInternalKey()) == 0); - assert(ExtractValueType(key_.GetInternalKey()) == ValueType::kTypeValue); + + ValueType value_type = ExtractValueType(key_.GetInternalKey()); + assert(value_type == ValueType::kTypeValue || + value_type == ValueType::kTypeMerge || + value_type == ValueType::kTypeDeletion); if (key_pinned_) { // TODO(tec): Investigate updating the seqno in the loaded block @@ -261,7 +265,7 @@ bool BlockIter::ParseNextKey() { key_pinned_ = false; } - key_.UpdateInternalKey(global_seqno_, ValueType::kTypeValue); + key_.UpdateInternalKey(global_seqno_, value_type); } value_ = Slice(p + non_shared, value_length); diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 0aa9a97ec..5d7a46c60 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -56,6 +56,67 @@ struct SstFileWriter::Rep { // The size of the file during the last time we called Fadvise to remove // cached pages from page cache. uint64_t last_fadvise_size; + Status Add(const Slice& user_key, const Slice& value, + const ValueType value_type) { + if (!builder) { + return Status::InvalidArgument("File is not opened"); + } + + if (file_info.num_entries == 0) { + file_info.smallest_key.assign(user_key.data(), user_key.size()); + } else { + if (internal_comparator.user_comparator()->Compare( + user_key, file_info.largest_key) <= 0) { + // Make sure that keys are added in order + return Status::InvalidArgument("Keys must be added in order"); + } + } + + // TODO(tec) : For external SST files we could omit the seqno and type. + switch (value_type) { + case ValueType::kTypeValue: + ikey.Set(user_key, 0 /* Sequence Number */, + ValueType::kTypeValue /* Put */); + break; + case ValueType::kTypeMerge: + ikey.Set(user_key, 0 /* Sequence Number */, + ValueType::kTypeMerge /* Merge */); + break; + case ValueType::kTypeDeletion: + ikey.Set(user_key, 0 /* Sequence Number */, + ValueType::kTypeDeletion /* Delete */); + break; + default: + return Status::InvalidArgument("Value type is not supported"); + } + builder->Add(ikey.Encode(), value); + + // update file info + file_info.num_entries++; + file_info.largest_key.assign(user_key.data(), user_key.size()); + file_info.file_size = builder->FileSize(); + + InvalidatePageCache(false /* closing */); + + return Status::OK(); + } + + void InvalidatePageCache(bool closing) { + if (invalidate_page_cache == false) { + // Fadvise disabled + return; + } + uint64_t bytes_since_last_fadvise = + builder->FileSize() - last_fadvise_size; + if (bytes_since_last_fadvise > kFadviseTrigger || closing) { + TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache", + &(bytes_since_last_fadvise)); + // Tell the OS that we dont need this file in page cache + file_writer->InvalidateCache(0, 0); + last_fadvise_size = builder->FileSize(); + } + } + }; SstFileWriter::SstFileWriter(const EnvOptions& env_options, @@ -149,34 +210,19 @@ Status SstFileWriter::Open(const std::string& file_path) { } Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { - Rep* r = rep_.get(); - if (!r->builder) { - return Status::InvalidArgument("File is not opened"); - } - - if (r->file_info.num_entries == 0) { - r->file_info.smallest_key.assign(user_key.data(), user_key.size()); - } else { - if (r->internal_comparator.user_comparator()->Compare( - user_key, r->file_info.largest_key) <= 0) { - // Make sure that keys are added in order - return Status::InvalidArgument("Keys must be added in order"); - } - } - - // TODO(tec) : For external SST files we could omit the seqno and type. - r->ikey.Set(user_key, 0 /* Sequence Number */, - ValueType::kTypeValue /* Put */); - r->builder->Add(r->ikey.Encode(), value); + return rep_->Add(user_key, value, ValueType::kTypeValue); +} - // Update file info - r->file_info.num_entries++; - r->file_info.largest_key.assign(user_key.data(), user_key.size()); - r->file_info.file_size = r->builder->FileSize(); +Status SstFileWriter::Put(const Slice& user_key, const Slice& value) { + return rep_->Add(user_key, value, ValueType::kTypeValue); +} - InvalidatePageCache(false /* closing */); +Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) { + return rep_->Add(user_key, value, ValueType::kTypeMerge); +} - return Status::OK(); +Status SstFileWriter::Delete(const Slice& user_key) { + return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion); } Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { @@ -193,7 +239,7 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { if (s.ok()) { s = r->file_writer->Sync(r->ioptions.use_fsync); - InvalidatePageCache(true /* closing */); + r->InvalidatePageCache(true /* closing */); if (s.ok()) { s = r->file_writer->Close(); } @@ -210,24 +256,6 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) { return s; } -void SstFileWriter::InvalidatePageCache(bool closing) { - Rep* r = rep_.get(); - if (r->invalidate_page_cache == false) { - // Fadvise disabled - return; - } - - uint64_t bytes_since_last_fadvise = - r->builder->FileSize() - r->last_fadvise_size; - if (bytes_since_last_fadvise > kFadviseTrigger || closing) { - TEST_SYNC_POINT_CALLBACK("SstFileWriter::InvalidatePageCache", - &(bytes_since_last_fadvise)); - // Tell the OS that we dont need this file in page cache - r->file_writer->InvalidateCache(0, 0); - r->last_fadvise_size = r->builder->FileSize(); - } -} - uint64_t SstFileWriter::FileSize() { return rep_->file_info.file_size; }