From 14f413760246685b376166209eba1a3324d706e4 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Fri, 24 Jul 2015 16:29:05 -0700 Subject: [PATCH 1/8] Correct the comment of DB::GetApproximateSizes Summary: Correct the comment of DB::GetApproximateSizes Test Plan: no code change Reviewers: igor, anthony, IslamAbdelRahman, kradhakrishnan, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42939 --- include/rocksdb/db.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 5ad1a390f..bcd170796 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -405,7 +405,9 @@ class DB { // if the user data compresses by a factor of ten, the returned // sizes will be one-tenth the size of the corresponding user data size. // - // The results may not include the sizes of recently written data. + // If include_memtable is set to true, then the result will also + // include those recently written data in the mem-tables if + // the mem-table type supports it. virtual void GetApproximateSizes(ColumnFamilyHandle* column_family, const Range* range, int n, uint64_t* sizes, bool include_memtable = false) = 0; From f73c801432bed1482b52ae8efdc699b00d7fcbdf Mon Sep 17 00:00:00 2001 From: Andres Noetzli Date: Fri, 24 Jul 2015 16:56:26 -0700 Subject: [PATCH 2/8] Fixing Java tests. Summary: While working on https://reviews.facebook.net/D43017 , I realized that some Java tests are failing due to a deprecated option. This patch removes the offending tests, adds @Deprecated annotations to the Java interface and removes the corresponding functions in rocksjni Test Plan: make jtest (all tests are passing now) Reviewers: rven, igor, sdong, anthony, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D43035 --- java/rocksjni/options.cc | 18 -------------- .../java/org/rocksdb/ColumnFamilyOptions.java | 6 +---- .../rocksdb/ColumnFamilyOptionsInterface.java | 24 +++++++------------ java/src/main/java/org/rocksdb/Options.java | 6 +---- .../org/rocksdb/ColumnFamilyOptionsTest.java | 15 ------------ .../test/java/org/rocksdb/OptionsTest.java | 15 ------------ java/src/test/java/org/rocksdb/TtlDBTest.java | 6 ++--- 7 files changed, 12 insertions(+), 78 deletions(-) diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 724fb8810..ef5b4bffd 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -1247,24 +1247,6 @@ void Java_org_rocksdb_Options_setLevelZeroStopWritesTrigger( static_cast(jlevel0_stop_writes_trigger); } -/* - * Class: org_rocksdb_Options - * Method: maxMemCompactionLevel - * Signature: (J)I - */ -jint Java_org_rocksdb_Options_maxMemCompactionLevel( - JNIEnv* env, jobject jobj, jlong jhandle) { - return 0; -} - -/* - * Class: org_rocksdb_Options - * Method: setMaxMemCompactionLevel - * Signature: (JI)V - */ -void Java_org_rocksdb_Options_setMaxMemCompactionLevel( - JNIEnv* env, jobject jobj, jlong jhandle, jint jmax_mem_compaction_level) {} - /* * Class: org_rocksdb_Options * Method: targetFileSizeBase diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index c916ccbd8..cdb97167c 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -276,13 +276,12 @@ public class ColumnFamilyOptions extends RocksObject @Override public ColumnFamilyOptions setMaxMemCompactionLevel( final int maxMemCompactionLevel) { - setMaxMemCompactionLevel(nativeHandle_, maxMemCompactionLevel); return this; } @Override public int maxMemCompactionLevel() { - return maxMemCompactionLevel(nativeHandle_); + return 0; } @Override @@ -708,9 +707,6 @@ public class ColumnFamilyOptions extends RocksObject private native void setLevelZeroStopWritesTrigger( long handle, int numFiles); private native int levelZeroStopWritesTrigger(long handle); - private native void setMaxMemCompactionLevel( - long handle, int maxMemCompactionLevel); - private native int maxMemCompactionLevel(long handle); private native void setTargetFileSizeBase( long handle, long targetFileSizeBase); private native long targetFileSizeBase(long handle); diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java index 5eaef4edd..ad296f111 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptionsInterface.java @@ -381,30 +381,22 @@ public interface ColumnFamilyOptionsInterface { int levelZeroStopWritesTrigger(); /** - * The highest level to which a new compacted memtable is pushed if it - * does not create overlap. We try to push to level 2 to avoid the - * relatively expensive level 0≥1 compactions and to avoid some - * expensive manifest file operations. We do not push all the way to - * the largest level since that can generate a lot of wasted disk - * space if the same key space is being repeatedly overwritten. - * - * @param maxMemCompactionLevel the highest level to which a new compacted - * mem-table will be pushed. + * This does nothing anymore. Deprecated. + * + * @param maxMemCompactionLevel Unused. + * * @return the reference to the current option. */ + @Deprecated Object setMaxMemCompactionLevel( int maxMemCompactionLevel); /** - * The highest level to which a new compacted memtable is pushed if it - * does not create overlap. We try to push to level 2 to avoid the - * relatively expensive level 0≥1 compactions and to avoid some - * expensive manifest file operations. We do not push all the way to - * the largest level since that can generate a lot of wasted disk - * space if the same key space is being repeatedly overwritten. + * This does nothing anymore. Deprecated. * - * @return the highest level where a new compacted memtable will be pushed. + * @return Always returns 0. */ + @Deprecated int maxMemCompactionLevel(); /** diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index fdd7fe903..92efc7887 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -763,13 +763,12 @@ public class Options extends RocksObject @Override public int maxMemCompactionLevel() { - return maxMemCompactionLevel(nativeHandle_); + return 0; } @Override public Options setMaxMemCompactionLevel( final int maxMemCompactionLevel) { - setMaxMemCompactionLevel(nativeHandle_, maxMemCompactionLevel); return this; } @@ -1227,9 +1226,6 @@ public class Options extends RocksObject private native void setLevelZeroStopWritesTrigger( long handle, int numFiles); private native int levelZeroStopWritesTrigger(long handle); - private native void setMaxMemCompactionLevel( - long handle, int maxMemCompactionLevel); - private native int maxMemCompactionLevel(long handle); private native void setTargetFileSizeBase( long handle, long targetFileSizeBase); private native long targetFileSizeBase(long handle); diff --git a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java index 531a1d0f8..1a0b4df3b 100644 --- a/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java +++ b/java/src/test/java/org/rocksdb/ColumnFamilyOptionsTest.java @@ -180,21 +180,6 @@ public class ColumnFamilyOptionsTest { } } - @Test - public void maxMemCompactionLevel() { - ColumnFamilyOptions opt = null; - try { - opt = new ColumnFamilyOptions(); - int intValue = rand.nextInt(); - opt.setMaxMemCompactionLevel(intValue); - assertThat(opt.maxMemCompactionLevel()).isEqualTo(intValue); - } finally { - if (opt != null) { - opt.dispose(); - } - } - } - @Test public void targetFileSizeBase() { ColumnFamilyOptions opt = null; diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index 5f950eaa2..611e37c51 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -142,21 +142,6 @@ public class OptionsTest { } } - @Test - public void maxMemCompactionLevel() { - Options opt = null; - try { - opt = new Options(); - int intValue = rand.nextInt(); - opt.setMaxMemCompactionLevel(intValue); - assertThat(opt.maxMemCompactionLevel()).isEqualTo(intValue); - } finally { - if (opt != null) { - opt.dispose(); - } - } - } - @Test public void targetFileSizeBase() { Options opt = null; diff --git a/java/src/test/java/org/rocksdb/TtlDBTest.java b/java/src/test/java/org/rocksdb/TtlDBTest.java index 0b816d66a..c60b1d512 100644 --- a/java/src/test/java/org/rocksdb/TtlDBTest.java +++ b/java/src/test/java/org/rocksdb/TtlDBTest.java @@ -33,8 +33,7 @@ public class TtlDBTest { try { options = new Options(). setCreateIfMissing(true). - setMaxGrandparentOverlapFactor(0). - setMaxMemCompactionLevel(0); + setMaxGrandparentOverlapFactor(0); ttlDB = TtlDB.open(options, dbFolder.getRoot().getAbsolutePath()); ttlDB.put("key".getBytes(), "value".getBytes()); @@ -59,8 +58,7 @@ public class TtlDBTest { try { options = new Options(). setCreateIfMissing(true). - setMaxGrandparentOverlapFactor(0). - setMaxMemCompactionLevel(0); + setMaxGrandparentOverlapFactor(0); ttlDB = TtlDB.open(options, dbFolder.getRoot().getAbsolutePath(), 1, false); ttlDB.put("key".getBytes(), "value".getBytes()); From 6a82fba75f39f53b58c52ed3971d90a9bce7bd37 Mon Sep 17 00:00:00 2001 From: Andres Noetzli Date: Fri, 24 Jul 2015 17:07:19 -0700 Subject: [PATCH 3/8] Add missing hashCode() implementation Summary: Whenever a Java class implements equals(), it has to implement hashCode(), otherwise there might be weird behavior when inserting instances of the class in a hash map for example. This adds two missing hashCode() implementations and extends tests to test the hashCode() implementations. Test Plan: make jtest Reviewers: rven, igor, sdong, yhchiang Reviewed By: yhchiang Subscribers: anthony, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D43017 --- Makefile | 2 +- java/src/main/java/org/rocksdb/AbstractSlice.java | 5 +++++ .../src/main/java/org/rocksdb/WBWIRocksIterator.java | 12 ++++++++++++ java/src/test/java/org/rocksdb/SliceTest.java | 1 + .../java/org/rocksdb/WriteBatchWithIndexTest.java | 1 + 5 files changed, 20 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b09bdd87f..5a09840c3 100644 --- a/Makefile +++ b/Makefile @@ -1037,7 +1037,7 @@ rocksdbjava: $(java_libobjects) jclean: cd java;$(MAKE) clean; -jtest: +jtest: rocksdbjava cd java;$(MAKE) sample;$(MAKE) test; jdb_bench: diff --git a/java/src/main/java/org/rocksdb/AbstractSlice.java b/java/src/main/java/org/rocksdb/AbstractSlice.java index 20fba41ac..a37bd023e 100644 --- a/java/src/main/java/org/rocksdb/AbstractSlice.java +++ b/java/src/main/java/org/rocksdb/AbstractSlice.java @@ -105,6 +105,11 @@ abstract class AbstractSlice extends RocksObject { return compare0(nativeHandle_, other.nativeHandle_); } + @Override + public int hashCode() { + return toString().hashCode(); + } + /** * If other is a slice object, then * we defer to {@link #compare(AbstractSlice) compare} diff --git a/java/src/main/java/org/rocksdb/WBWIRocksIterator.java b/java/src/main/java/org/rocksdb/WBWIRocksIterator.java index 9d7f8c4e8..f42f5498b 100644 --- a/java/src/main/java/org/rocksdb/WBWIRocksIterator.java +++ b/java/src/main/java/org/rocksdb/WBWIRocksIterator.java @@ -117,6 +117,18 @@ public class WBWIRocksIterator extends AbstractRocksIterator Date: Mon, 27 Jul 2015 14:25:57 -0700 Subject: [PATCH 4/8] Fix when output level is 0 of universal compaction with trivial move Summary: Fix for universal compaction with trivial move, when the ouput level is 0. The tests where failing. Fixed by allowing normal compaction when output level is 0. Test Plan: modified test cases run successfully. Reviewers: sdong, yhchiang, IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: anthony, kradhakrishnan, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D42933 --- db/compaction.cc | 3 ++- db/compaction_picker.cc | 21 +++++++++++++++------ db/compaction_picker.h | 2 +- db/db_impl.cc | 7 ++++--- db/db_universal_compaction_test.cc | 29 +++++++++++++++++++---------- util/auto_roll_logger_test.cc | 3 +-- 6 files changed, 42 insertions(+), 23 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index e5ec7de42..0aa17c09c 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -168,7 +168,8 @@ bool Compaction::IsTrivialMove() const { // Used in universal compaction, where trivial move can be done if the // input files are non overlapping - if (cfd_->ioptions()->compaction_options_universal.allow_trivial_move) { + if ((cfd_->ioptions()->compaction_options_universal.allow_trivial_move) && + (output_level_ != 0)) { return is_trivial_move_; } diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 13e3d94c2..650d77297 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1114,7 +1114,7 @@ void UniversalCompactionPicker::SortedRun::DumpSizeInfo( std::vector UniversalCompactionPicker::CalculateSortedRuns( - const VersionStorageInfo& vstorage) { + const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions) { std::vector ret; for (FileMetaData* f : vstorage.LevelFiles(0)) { ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, @@ -1128,10 +1128,18 @@ UniversalCompactionPicker::CalculateSortedRuns( for (FileMetaData* f : vstorage.LevelFiles(level)) { total_compensated_size += f->compensated_file_size; total_size += f->fd.GetFileSize(); - // Compaction always includes all files for a non-zero level, so for a - // non-zero level, all the files should share the same being_compacted - // value. - assert(is_first || f->being_compacted == being_compacted); + if (ioptions.compaction_options_universal.allow_trivial_move == true) { + if (f->being_compacted) { + being_compacted = f->being_compacted; + } + } else { + // Compaction always includes all files for a non-zero level, so for a + // non-zero level, all the files should share the same being_compacted + // value. + // This assumption is only valid when + // ioptions.compaction_options_universal.allow_trivial_move is false + assert(is_first || f->being_compacted == being_compacted); + } if (is_first) { being_compacted = f->being_compacted; is_first = false; @@ -1223,7 +1231,8 @@ Compaction* UniversalCompactionPicker::PickCompaction( VersionStorageInfo* vstorage, LogBuffer* log_buffer) { const int kLevel0 = 0; double score = vstorage->CompactionScore(kLevel0); - std::vector sorted_runs = CalculateSortedRuns(*vstorage); + std::vector sorted_runs = + CalculateSortedRuns(*vstorage, ioptions_); if (sorted_runs.size() < (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) { diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 65ca73abf..61ed99505 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -275,7 +275,7 @@ class UniversalCompactionPicker : public CompactionPicker { const std::vector& sorted_runs, LogBuffer* log_buffer); static std::vector CalculateSortedRuns( - const VersionStorageInfo& vstorage); + const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions); // Pick a path ID to place a newly generated file, with its estimated file // size. diff --git a/db/db_impl.cc b/db/db_impl.cc index 133297c1e..3d98fbc7d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2542,8 +2542,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, int32_t moved_files = 0; int64_t moved_bytes = 0; for (unsigned int l = 0; l < c->num_input_levels(); l++) { - if (l == static_cast(c->output_level()) || - (c->output_level() == 0)) { + if (c->level(l) == c->output_level()) { continue; } for (size_t i = 0; i < c->num_input_files(l); i++) { @@ -2591,7 +2590,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); } else { - TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial"); + int output_level __attribute__((unused)) = c->output_level(); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", + &output_level); assert(is_snapshot_supported_ || snapshots_.empty()); CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index cae56eab1..c788157bc 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -452,8 +452,12 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { "DBImpl::BackgroundCompaction:TrivialMove", [&](void* arg) { trivial_move++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:NonTrivial", - [&](void* arg) { non_trivial_move++; }); + "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) { + non_trivial_move++; + ASSERT_TRUE(arg != nullptr); + int output_level = *(static_cast(arg)); + ASSERT_EQ(output_level, 0); + }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Options options; @@ -462,7 +466,7 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { options.num_levels = 3; options.write_buffer_size = 100 << 10; // 100KB options.level0_file_num_compaction_trigger = 3; - options.max_background_compactions = 1; + options.max_background_compactions = 2; options.target_file_size_base = 32 * 1024; options = CurrentOptions(options); DestroyAndReopen(options); @@ -474,7 +478,7 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { ReopenWithColumnFamilies({"default", "pikachu"}, options); Random rnd(301); - int num_keys = 15000; + int num_keys = 150000; for (int i = 0; i < num_keys; i++) { ASSERT_OK(Put(1, Key(i), Key(i))); } @@ -484,7 +488,7 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { dbfull()->TEST_WaitForCompact(); ASSERT_GT(trivial_move, 0); - ASSERT_EQ(non_trivial_move, 0); + ASSERT_GT(non_trivial_move, 0); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } @@ -789,14 +793,18 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) { "DBImpl::BackgroundCompaction:TrivialMove", [&](void* arg) { trivial_move++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:NonTrivial", - [&](void* arg) { non_trivial_move++; }); + "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) { + non_trivial_move++; + ASSERT_TRUE(arg != nullptr); + int output_level = *(static_cast(arg)); + ASSERT_EQ(output_level, 0); + }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Options options; options.compaction_style = kCompactionStyleUniversal; options.compaction_options_universal.allow_trivial_move = true; - options.num_levels = 3; + options.num_levels = 2; options.write_buffer_size = 100 << 10; // 100KB options.level0_file_num_compaction_trigger = 3; options.max_background_compactions = 1; @@ -811,7 +819,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) { ReopenWithColumnFamilies({"default", "pikachu"}, options); Random rnd(301); - int num_keys = 150000; + int num_keys = 250000; for (int i = 0; i < num_keys; i++) { ASSERT_OK(Put(1, Key(i), Key(i))); } @@ -821,7 +829,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest1) { dbfull()->TEST_WaitForCompact(); ASSERT_GT(trivial_move, 0); - ASSERT_EQ(non_trivial_move, 0); + ASSERT_GT(non_trivial_move, 0); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } @@ -835,6 +843,7 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionTrivialMoveTest2) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) { non_trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Options options; diff --git a/util/auto_roll_logger_test.cc b/util/auto_roll_logger_test.cc index ef340630a..138eb6eb4 100644 --- a/util/auto_roll_logger_test.cc +++ b/util/auto_roll_logger_test.cc @@ -28,8 +28,7 @@ class AutoRollLoggerTest : public testing::Test { // become confused std::string testDir(kTestDir); std::replace_if(testDir.begin(), testDir.end(), - [](char ch) { return ch == '/'; }, - '\\'); + [](char ch) { return ch == '/'; }, '\\'); std::string deleteCmd = "if exist " + testDir + " rd /s /q " + testDir; #else std::string deleteCmd = "rm -rf " + kTestDir; From e95c59cd2f0eb0ac4885dc46e7529193d430a08d Mon Sep 17 00:00:00 2001 From: Andres Notzli Date: Tue, 28 Jul 2015 16:41:40 -0700 Subject: [PATCH 5/8] Count number of corrupt keys during compaction Summary: For task #7771355, we would like to log the number of corrupt keys during a compaction. This patch implements and tests the count as part of CompactionJobStats. Test Plan: make && make check Reviewers: rven, igor, yhchiang, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42921 --- db/compaction_job.cc | 4 ++++ db/compaction_job_stats_test.cc | 6 ++++++ db/compaction_job_test.cc | 23 ++++++++++++----------- include/rocksdb/compaction_job_stats.h | 5 ++++- util/compaction_job_stats_impl.cc | 17 +++++++++-------- 5 files changed, 35 insertions(+), 20 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 1f924e9bb..2165044a9 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -385,6 +385,10 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; + + if (compaction_job_stats_ != nullptr) { + compaction_job_stats_->num_corrupt_keys++; + } } else { if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { compaction_job_stats_->num_input_deletion_records++; diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 1c066e8e9..e61dfc2c5 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -467,6 +467,9 @@ class CompactionJobStatsChecker : public EventListener { ASSERT_EQ(current_stats.num_records_replaced, stats.num_records_replaced); + ASSERT_EQ(current_stats.num_corrupt_keys, + stats.num_corrupt_keys); + ASSERT_EQ( std::string(current_stats.smallest_output_key_prefix), std::string(stats.smallest_output_key_prefix)); @@ -509,6 +512,9 @@ class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker { ASSERT_EQ( current_stats.num_records_replaced, stats.num_records_replaced); + + ASSERT_EQ(current_stats.num_corrupt_keys, + stats.num_corrupt_keys); } }; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index ab716c068..781c0dd7b 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -46,16 +46,14 @@ void VerifyInitializationOfCompactionJobStats( ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0); ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U); + + ASSERT_EQ(compaction_job_stats.num_input_deletion_records, 0U); + ASSERT_EQ(compaction_job_stats.num_expired_deletion_records, 0U); + + ASSERT_EQ(compaction_job_stats.num_corrupt_keys, 0U); #endif // !defined(IOS_CROSS_COMPILE) } -void VerifyCompactionJobStats(const CompactionJobStats& compaction_job_stats, - const std::vector& files, - size_t num_output_files) { - ASSERT_GE(compaction_job_stats.elapsed_micros, 0U); - ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); - ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); -} } // namespace // TODO(icanadi) Make it simpler once we mock out VersionSet @@ -197,13 +195,12 @@ class CompactionJobTest : public testing::Test { LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); EventLogger event_logger(db_options_.info_log.get()); - CompactionJobStats compaction_job_stats; CompactionJob compaction_job( 0, &compaction, db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, {}, - table_cache_, &event_logger, false, dbname_, &compaction_job_stats); + table_cache_, &event_logger, false, dbname_, &compaction_job_stats_); - VerifyInitializationOfCompactionJobStats(compaction_job_stats); + VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); mutex_.Unlock(); @@ -214,7 +211,9 @@ class CompactionJobTest : public testing::Test { ASSERT_OK(s); mutex_.Unlock(); - VerifyCompactionJobStats(compaction_job_stats, files, 1); + ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U); + ASSERT_EQ(compaction_job_stats_.num_input_files, files.size()); + ASSERT_EQ(compaction_job_stats_.num_output_files, 1U); } Env* env_; @@ -230,6 +229,7 @@ class CompactionJobTest : public testing::Test { InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; + CompactionJobStats compaction_job_stats_; }; TEST_F(CompactionJobTest, Simple) { @@ -248,6 +248,7 @@ TEST_F(CompactionJobTest, SimpleCorrupted) { auto files = cfd->current()->storage_info()->LevelFiles(0); RunCompaction(files); + ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U); mock_table_factory_->AssertLatestFile(expected_results); } diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index 50bbdab33..3eff7a006 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -49,12 +49,15 @@ struct CompactionJobStats { // the number of deletion entries before compaction. Deletion entries // can disappear after compaction because they expired uint64_t num_input_deletion_records; - // number of deletion records that were found obsolete and discarded // because it is not possible to delete any more keys with this entry // (i.e. all possible deletions resulting from it have been completed) uint64_t num_expired_deletion_records; + // number of corrupt keys (ParseInternalKey returned false when applied to + // the key) encountered and written out. + uint64_t num_corrupt_keys; + // 0-terminated strings storing the first 8 bytes of the smallest and // largest key in the output. static const size_t kMaxPrefixLength = 8; diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index 2496b1097..fd60d8abe 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -3,8 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include -#include "include/rocksdb/compaction_job_stats.h" +#include "rocksdb/compaction_job_stats.h" namespace rocksdb { @@ -13,25 +12,27 @@ namespace rocksdb { void CompactionJobStats::Reset() { elapsed_micros = 0; + num_input_records = 0; num_input_files = 0; num_input_files_at_output_level = 0; - num_output_files = 0; - num_input_records = 0; num_output_records = 0; + num_output_files = 0; + + is_manual_compaction = 0; total_input_bytes = 0; total_output_bytes = 0; - total_input_raw_key_bytes = 0; - total_input_raw_value_bytes = 0; - num_records_replaced = 0; - is_manual_compaction = 0; + total_input_raw_key_bytes = 0; + total_input_raw_value_bytes = 0; num_input_deletion_records = 0; num_expired_deletion_records = 0; + + num_corrupt_keys = 0; } #else From d06c82e477c7c1337d9cca8780e9e84c2893137f Mon Sep 17 00:00:00 2001 From: Andres Notzli Date: Tue, 28 Jul 2015 19:21:55 -0700 Subject: [PATCH 6/8] Further cleanup of CompactionJob and MergeHelper Summary: Simplified logic in CompactionJob and removed unused parameter in MergeHelper. Test Plan: make && make check Reviewers: rven, igor, sdong, yhchiang Reviewed By: sdong Subscribers: aekmekji, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42687 --- db/compaction_job.cc | 311 ++++++++++++++++++---------------------- db/merge_helper.cc | 13 +- db/merge_helper.h | 13 +- db/merge_helper_test.cc | 20 ++- 4 files changed, 156 insertions(+), 201 deletions(-) diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 2165044a9..97a24c5ca 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -210,29 +210,15 @@ Status CompactionJob::Run() { ThreadStatus::STAGE_COMPACTION_RUN); TEST_SYNC_POINT("CompactionJob::Run():Start"); log_buffer_->FlushBufferToLog(); - ColumnFamilyData* cfd = compact_->compaction->column_family_data(); auto* compaction = compact_->compaction; - LogCompaction(cfd, compaction); + LogCompaction(compaction->column_family_data(), compaction); + int64_t imm_micros = 0; // Micros spent doing imm_ compactions const uint64_t start_micros = env_->NowMicros(); - std::unique_ptr input( - versions_->MakeInputIterator(compact_->compaction)); - input->SeekToFirst(); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions + std::unique_ptr input(versions_->MakeInputIterator(compaction)); + input->SeekToFirst(); auto status = ProcessKeyValueCompaction(&imm_micros, input.get()); - - if (status.ok() && - (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { - status = Status::ShutdownInProgress( - "Database shutdown or Column family drop during compaction"); - } - if (status.ok() && compact_->builder != nullptr) { - status = FinishCompactionOutputFile(input->status()); - } - if (status.ok()) { - status = input->status(); - } input.reset(); if (output_directory_ && !db_options_.disableDataSync) { @@ -331,7 +317,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, false /* internal key corruption is expected */); auto compaction_filter = cfd->ioptions()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; - if (!compaction_filter) { + if (compaction_filter == nullptr) { compaction_filter_from_factory = compact_->compaction->CreateCompactionFilter(); compaction_filter = compaction_filter_from_factory.get(); @@ -346,6 +332,9 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, StopWatchNano timer(env_, stats_ != nullptr); uint64_t total_filter_time = 0; + + // TODO(noetzli): check whether we could check !shutting_down_->... only + // only occasionally (see diff D42687) while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) && !cfd->IsDropped() && status.ok()) { compact_->num_input_records++; @@ -360,10 +349,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, Slice value = input->value(); if (compaction_job_stats_ != nullptr) { - compaction_job_stats_->total_input_raw_key_bytes += - input->key().size(); - compaction_job_stats_->total_input_raw_value_bytes += - input->value().size(); + compaction_job_stats_->total_input_raw_key_bytes += key.size(); + compaction_job_stats_->total_input_raw_value_bytes += value.size(); } if (compact_->compaction->ShouldStopBefore(key) && @@ -375,8 +362,6 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, } // Handle key/value, add to state, etc. - bool drop = false; - bool current_entry_is_merging = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys // TODO: error key stays in db forever? Figure out the intention/rationale @@ -389,177 +374,157 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, if (compaction_job_stats_ != nullptr) { compaction_job_stats_->num_corrupt_keys++; } - } else { - if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { - compaction_job_stats_->num_input_deletion_records++; - } - if (!has_current_user_key || - cfd->user_comparator()->Compare(ikey.user_key, - current_user_key.GetKey()) != 0) { - // First occurrence of this user key - current_user_key.SetKey(ikey.user_key); - has_current_user_key = true; - last_sequence_for_key = kMaxSequenceNumber; - visible_in_snapshot = kMaxSequenceNumber; - // apply the compaction filter to the first occurrence of the user key - if (compaction_filter && ikey.type == kTypeValue && - (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { - // If the user has specified a compaction filter and the sequence - // number is greater than any external snapshot, then invoke the - // filter. - // If the return value of the compaction filter is true, replace - // the entry with a delete marker. - bool value_changed = false; - compaction_filter_value.clear(); - if (stats_ != nullptr) { - timer.Start(); - } - bool to_delete = compaction_filter->Filter( - compact_->compaction->level(), ikey.user_key, value, - &compaction_filter_value, &value_changed); - total_filter_time += timer.ElapsedNanos(); - if (to_delete) { - // make a copy of the original key and convert it to a delete - delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, - kTypeDeletion); - // anchor the key again - key = delete_key.GetKey(); - // needed because ikey is backed by key - ParseInternalKey(key, &ikey); - // no value associated with delete - value.clear(); - ++key_drop_user; - } else if (value_changed) { - value = compaction_filter_value; - } - } - } + status = WriteKeyValue(key, value, ikey, input->status()); + input->Next(); + continue; + } - // If there are no snapshots, then this kv affect visibility at tip. - // Otherwise, search though all existing snapshots to find - // the earlist snapshot that is affected by this kv. - SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot - SequenceNumber visible = - visible_at_tip_ - ? visible_at_tip_ - : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, - &prev_snapshot); - - if (visible_in_snapshot == visible) { - // If the earliest snapshot is which this key is visible in - // is the same as the visibily of a previous instance of the - // same key, then this kv is not visible in any snapshot. - // Hidden by an newer entry for same user key - // TODO: why not > ? - assert(last_sequence_for_key >= ikey.sequence); - drop = true; // (A) - ++key_drop_newer_entry; - } else if (ikey.type == kTypeDeletion && - ikey.sequence <= earliest_snapshot_ && - compact_->compaction->KeyNotExistsBeyondOutputLevel( - ikey.user_key)) { - // For this user key: - // (1) there is no data in higher levels - // (2) data in lower levels will have larger sequence numbers - // (3) data in layers that are being compacted here and have - // smaller sequence numbers will be dropped in the next - // few iterations of this loop (by rule (A) above). - // Therefore this deletion marker is obsolete and can be dropped. - drop = true; - ++key_drop_obsolete; - } else if (ikey.type == kTypeMerge) { - if (!merge.HasOperator()) { - LogToBuffer(log_buffer_, "Options::merge_operator is null."); - status = Status::InvalidArgument( - "merge_operator is not properly initialized."); - break; + if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) { + compaction_job_stats_->num_input_deletion_records++; + } + + if (!has_current_user_key || + cfd->user_comparator()->Compare(ikey.user_key, + current_user_key.GetKey()) != 0) { + // First occurrence of this user key + current_user_key.SetKey(ikey.user_key); + has_current_user_key = true; + last_sequence_for_key = kMaxSequenceNumber; + visible_in_snapshot = kMaxSequenceNumber; + // apply the compaction filter to the first occurrence of the user key + if (compaction_filter && ikey.type == kTypeValue && + (visible_at_tip_ || ikey.sequence > latest_snapshot_)) { + // If the user has specified a compaction filter and the sequence + // number is greater than any external snapshot, then invoke the + // filter. + // If the return value of the compaction filter is true, replace + // the entry with a delete marker. + bool value_changed = false; + compaction_filter_value.clear(); + if (stats_ != nullptr) { + timer.Start(); } - // We know the merge type entry is not hidden, otherwise we would - // have hit (A) - // We encapsulate the merge related state machine in a different - // object to minimize change to the existing flow. Turn out this - // logic could also be nicely re-used for memtable flush purge - // optimization in BuildTable. - merge.MergeUntil(input, prev_snapshot, bottommost_level_, - db_options_.statistics.get(), nullptr, env_); - - current_entry_is_merging = true; - if (merge.IsSuccess()) { - // Successfully found Put/Delete/(end-of-key-range) while merging - // Get the merge result - key = merge.key(); + bool to_delete = compaction_filter->Filter( + compact_->compaction->level(), ikey.user_key, value, + &compaction_filter_value, &value_changed); + total_filter_time += timer.ElapsedNanos(); + if (to_delete) { + // make a copy of the original key and convert it to a delete + delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence, + kTypeDeletion); + // anchor the key again + key = delete_key.GetKey(); + // needed because ikey is backed by key ParseInternalKey(key, &ikey); - value = merge.value(); - } else { - // Did not find a Put/Delete/(end-of-key-range) while merging - // We now have some stack of merge operands to write out. - // NOTE: key,value, and ikey are now referring to old entries. - // These will be correctly set below. - assert(!merge.keys().empty()); - assert(merge.keys().size() == merge.values().size()); - - // Hack to make sure last_sequence_for_key is correct - ParseInternalKey(merge.keys().front(), &ikey); + // no value associated with delete + value.clear(); + ++key_drop_user; + } else if (value_changed) { + value = compaction_filter_value; } } - - last_sequence_for_key = ikey.sequence; - visible_in_snapshot = visible; } - if (!drop) { - // We may write a single key (e.g.: for Put/Delete or successful merge). - // Or we may instead have to write a sequence/list of keys. - // We have to write a sequence iff we have an unsuccessful merge - if (current_entry_is_merging && !merge.IsSuccess()) { + // If there are no snapshots, then this kv affect visibility at tip. + // Otherwise, search though all existing snapshots to find + // the earlist snapshot that is affected by this kv. + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + SequenceNumber visible = + visible_at_tip_ + ? visible_at_tip_ + : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_, + &prev_snapshot); + + if (visible_in_snapshot == visible) { + // If the earliest snapshot is which this key is visible in + // is the same as the visibily of a previous instance of the + // same key, then this kv is not visible in any snapshot. + // Hidden by an newer entry for same user key + // TODO: why not > ? + assert(last_sequence_for_key >= ikey.sequence); + ++key_drop_newer_entry; + input->Next(); // (A) + } else if (ikey.type == kTypeDeletion && + ikey.sequence <= earliest_snapshot_ && + compact_->compaction->KeyNotExistsBeyondOutputLevel( + ikey.user_key)) { + // For this user key: + // (1) there is no data in higher levels + // (2) data in lower levels will have larger sequence numbers + // (3) data in layers that are being compacted here and have + // smaller sequence numbers will be dropped in the next + // few iterations of this loop (by rule (A) above). + // Therefore this deletion marker is obsolete and can be dropped. + ++key_drop_obsolete; + input->Next(); + } else if (ikey.type == kTypeMerge) { + if (!merge.HasOperator()) { + LogToBuffer(log_buffer_, "Options::merge_operator is null."); + status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + break; + } + // We know the merge type entry is not hidden, otherwise we would + // have hit (A) + // We encapsulate the merge related state machine in a different + // object to minimize change to the existing flow. Turn out this + // logic could also be nicely re-used for memtable flush purge + // optimization in BuildTable. + merge.MergeUntil(input, prev_snapshot, bottommost_level_, + db_options_.statistics.get(), env_); + + if (merge.IsSuccess()) { + // Successfully found Put/Delete/(end-of-key-range) while merging + // Get the merge result + key = merge.key(); + ParseInternalKey(key, &ikey); + status = WriteKeyValue(key, merge.value(), ikey, input->status()); + } else { + // Did not find a Put/Delete/(end-of-key-range) while merging + // We now have some stack of merge operands to write out. + // NOTE: key,value, and ikey are now referring to old entries. + // These will be correctly set below. const auto& keys = merge.keys(); const auto& values = merge.values(); - std::deque::const_reverse_iterator key_iter = - keys.rbegin(); // The back (*rbegin()) is the first key - std::deque::const_reverse_iterator value_iter = - values.rbegin(); - - key = Slice(*key_iter); - value = Slice(*value_iter); - - // We have a list of keys to write, traverse the list. - while (true) { - status = WriteKeyValue(key, value, ikey, input->status()); - if (!status.ok()) { - break; - } - - ++key_iter; - ++value_iter; + assert(!keys.empty()); + assert(keys.size() == values.size()); - // If at end of list - if (key_iter == keys.rend() || value_iter == values.rend()) { - // Sanity Check: if one ends, then both end - assert(key_iter == keys.rend() && value_iter == values.rend()); - break; - } - - // Otherwise not at end of list. Update key, value, and ikey. + // We have a list of keys to write, write all keys in the list. + for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); + !status.ok() || key_iter != keys.rend(); + key_iter++, value_iter++) { key = Slice(*key_iter); value = Slice(*value_iter); ParseInternalKey(key, &ikey); + status = WriteKeyValue(key, value, ikey, input->status()); } - } else { - // There is only one item to be written out - status = WriteKeyValue(key, value, ikey, input->status()); } - } // if (!drop) - - // MergeUntil has moved input to the next entry - if (!current_entry_is_merging) { + } else { + status = WriteKeyValue(key, value, ikey, input->status()); input->Next(); } + + last_sequence_for_key = ikey.sequence; + visible_in_snapshot = visible; } + RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time); RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete); RecordCompactionIOStats(); + if (status.ok() && + (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) { + status = Status::ShutdownInProgress( + "Database shutdown or Column family drop during compaction"); + } + if (status.ok() && compact_->builder != nullptr) { + status = FinishCompactionOutputFile(input->status()); + } + if (status.ok()) { + status = input->status(); + } + return status; } diff --git a/db/merge_helper.cc b/db/merge_helper.cc index cf58c6812..c8a4b140c 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -58,8 +58,8 @@ Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value, // keys_ stores the list of keys encountered while merging. // operands_ stores the list of merge operands encountered while merging. // keys_[i] corresponds to operands_[i] for each i. -void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, - bool at_bottom, Statistics* stats, int* steps, +void MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before, + const bool at_bottom, Statistics* stats, Env* env_) { // Get a copy of the internal key, before it's invalidated by iter->Next() // Also maintain the list of merge operands seen. @@ -81,9 +81,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; - if (steps) { - ++(*steps); - } for (iter->Next(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; assert(operands_.size() >= 1); // Should be invariants! @@ -138,9 +135,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // move iter to the next entry iter->Next(); - if (steps) { - ++(*steps); - } return; } else { // hit a merge @@ -153,9 +147,6 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, // request or later did a partial merge. keys_.push_front(iter->key().ToString()); operands_.push_front(iter->value().ToString()); - if (steps) { - ++(*steps); - } } } diff --git a/db/merge_helper.h b/db/merge_helper.h index 7722446dd..8ad6acc07 100644 --- a/db/merge_helper.h +++ b/db/merge_helper.h @@ -6,11 +6,12 @@ #ifndef MERGE_HELPER_H #define MERGE_HELPER_H -#include "db/dbformat.h" -#include "rocksdb/slice.h" -#include #include +#include + +#include "db/dbformat.h" #include "rocksdb/env.h" +#include "rocksdb/slice.h" namespace rocksdb { @@ -56,9 +57,9 @@ class MergeHelper { // 0 means no restriction // at_bottom: (IN) true if the iterator covers the bottem level, which means // we could reach the start of the history of this user key. - void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, - bool at_bottom = false, Statistics* stats = nullptr, - int* steps = nullptr, Env* env_ = nullptr); + void MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0, + const bool at_bottom = false, Statistics* stats = nullptr, + Env* env_ = nullptr); // Query the merge result // These are valid until the next MergeUntil call diff --git a/db/merge_helper_test.cc b/db/merge_helper_test.cc index a1ac91014..1dd17fb5f 100644 --- a/db/merge_helper_test.cc +++ b/db/merge_helper_test.cc @@ -18,7 +18,7 @@ namespace rocksdb { class MergeHelperTest : public testing::Test { public: - MergeHelperTest() : steps_(0) {} + MergeHelperTest() = default; ~MergeHelperTest() = default; void RunUInt64MergeHelper(SequenceNumber stop_before, bool at_bottom) { @@ -27,7 +27,7 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), nullptr, 2U, true)); merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - &steps_, Env::Default()); + Env::Default()); } void RunStringAppendMergeHelper(SequenceNumber stop_before, bool at_bottom) { @@ -36,7 +36,7 @@ class MergeHelperTest : public testing::Test { merge_helper_.reset(new MergeHelper(BytewiseComparator(), merge_op_.get(), nullptr, 2U, true)); merge_helper_->MergeUntil(iter_.get(), stop_before, at_bottom, nullptr, - &steps_, Env::Default()); + Env::Default()); } std::string Key(const std::string& user_key, const SequenceNumber& seq, @@ -63,9 +63,8 @@ class MergeHelperTest : public testing::Test { return result; } - void CheckState(bool success, int steps, int iter_pos) { + void CheckState(bool success, int iter_pos) { ASSERT_EQ(success, merge_helper_->IsSuccess()); - ASSERT_EQ(steps, steps_); if (iter_pos == -1) { ASSERT_FALSE(iter_->Valid()); } else { @@ -78,7 +77,6 @@ class MergeHelperTest : public testing::Test { std::unique_ptr merge_helper_; std::vector ks_; std::vector vs_; - int steps_; }; // If MergeHelper encounters a new key on the last level, we know that @@ -89,7 +87,7 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) { AddKeyVal("b", 10, kTypeMerge, EncodeInt(4U)); // <- Iterator after merge RunUInt64MergeHelper(0, true); - CheckState(true, 2, 2); + CheckState(true, 2); ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->key()); ASSERT_EQ(EncodeInt(4U), merge_helper_->value()); } @@ -102,7 +100,7 @@ TEST_F(MergeHelperTest, MergeValue) { AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); RunUInt64MergeHelper(0, false); - CheckState(true, 3, 3); + CheckState(true, 3); ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->key()); ASSERT_EQ(EncodeInt(8U), merge_helper_->value()); } @@ -116,7 +114,7 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) { AddKeyVal("a", 10, kTypeMerge, EncodeInt(1U)); RunUInt64MergeHelper(31, true); - CheckState(false, 2, 2); + CheckState(false, 2); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]); } @@ -129,7 +127,7 @@ TEST_F(MergeHelperTest, NoPartialMerge) { AddKeyVal("a", 30, kTypeMerge, "v"); RunStringAppendMergeHelper(31, true); - CheckState(false, 2, 2); + CheckState(false, 2); ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]); ASSERT_EQ("v", merge_helper_->values()[0]); ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]); @@ -142,7 +140,7 @@ TEST_F(MergeHelperTest, MergeDeletion) { AddKeyVal("a", 20, kTypeDeletion, ""); RunUInt64MergeHelper(15, false); - CheckState(true, 2, -1); + CheckState(true, -1); ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->key()); ASSERT_EQ(EncodeInt(3U), merge_helper_->value()); } From 7bfae3a7237c293b84f85437b2f3c2a3143d5116 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 28 Jul 2015 18:31:58 -0700 Subject: [PATCH 7/8] tools/db_crashtest2.py should run on the same DB Summary: Crash tests are supposed to restart the same DB after crashing, but it is now opening a different DB. Fix it. It's probably a leftover of https://reviews.facebook.net/D17073 Test Plan: Run the test and make sure the same Db is opened. Reviewers: kradhakrishnan, rven, igor, IslamAbdelRahman, yhchiang, anthony Reviewed By: anthony Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D43197 --- tools/db_crashtest2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index bca46b853..bd6ee0840 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -63,6 +63,7 @@ def main(argv): total_check_mode = 4 check_mode = 0 + dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_') while time.time() < exit_time: killoption = "" if check_mode == 0: @@ -85,7 +86,6 @@ def main(argv): # normal run additional_opts = "--ops_per_thread=" + str(ops_per_thread) - dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_') cmd = re.sub('\s+', ' ', """ ./db_stress --test_batches_snapshots=%s From 8161bdb5a0da1d65a1b5d8ac8226e1386003140f Mon Sep 17 00:00:00 2001 From: agiardullo Date: Fri, 10 Jul 2015 20:15:45 -0700 Subject: [PATCH 8/8] WriteBatch Save Points Summary: Support RollbackToSavePoint() in WriteBatch and WriteBatchWithIndex. Support for partial transaction rollback is needed for MyRocks. An alternate implementation of Transaction::RollbackToSavePoint() exists in D40869. However, the other implementation is messier because it is implemented outside of WriteBatch. This implementation is much cleaner and also exposes a potentially useful feature to WriteBatch. Test Plan: Added unit tests Reviewers: IslamAbdelRahman, kradhakrishnan, maykov, yoshinorim, hermanlee4, spetrunia, sdong, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42723 --- HISTORY.md | 3 + db/write_batch.cc | 63 +++++++- db/write_batch_internal.h | 4 + db/write_batch_test.cc | 105 +++++++++++++ .../utilities/write_batch_with_index.h | 16 ++ include/rocksdb/write_batch.h | 16 +- include/rocksdb/write_batch_base.h | 11 ++ .../write_batch_with_index.cc | 79 +++++++++- .../write_batch_with_index_internal.cc | 7 +- .../write_batch_with_index_test.cc | 144 ++++++++++++++++++ 10 files changed, 442 insertions(+), 6 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 33721c8f4..0e6db1347 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,9 @@ ## Unreleased +### New Features +* RollbackToSavePoint() in WriteBatch/WriteBatchWithIndex + ### Public API Changes * Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it. * Deprecated purge_redundant_kvs_while_flush option. diff --git a/db/write_batch.cc b/db/write_batch.cc index 53509b90f..44e1cd84d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -23,6 +23,10 @@ // data: uint8[len] #include "rocksdb/write_batch.h" + +#include +#include + #include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" @@ -32,7 +36,6 @@ #include "db/write_batch_internal.h" #include "util/coding.h" #include "util/statistics.h" -#include #include "util/perf_context_imp.h" namespace rocksdb { @@ -40,12 +43,26 @@ namespace rocksdb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. static const size_t kHeader = 12; -WriteBatch::WriteBatch(size_t reserved_bytes) { +struct SavePoint { + size_t size; // size of rep_ + int count; // count of elements in rep_ + SavePoint(size_t s, int c) : size(s), count(c) {} +}; + +struct SavePoints { + std::stack stack; +}; + +WriteBatch::WriteBatch(size_t reserved_bytes) : save_points_(nullptr) { rep_.reserve((reserved_bytes > kHeader) ? reserved_bytes : kHeader); Clear(); } -WriteBatch::~WriteBatch() { } +WriteBatch::~WriteBatch() { + if (save_points_ != nullptr) { + delete save_points_; + } +} WriteBatch::Handler::~Handler() { } @@ -61,6 +78,12 @@ bool WriteBatch::Handler::Continue() { void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); + + if (save_points_ != nullptr) { + while (!save_points_->stack.empty()) { + save_points_->stack.pop(); + } + } } int WriteBatch::Count() const { @@ -188,6 +211,8 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } +size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) { return kHeader; } + void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); @@ -301,6 +326,38 @@ void WriteBatch::PutLogData(const Slice& blob) { PutLengthPrefixedSlice(&rep_, blob); } +void WriteBatch::SetSavePoint() { + if (save_points_ == nullptr) { + save_points_ = new SavePoints(); + } + // Record length and count of current batch of writes. + save_points_->stack.push(SavePoint(GetDataSize(), Count())); +} + +Status WriteBatch::RollbackToSavePoint() { + if (save_points_ == nullptr || save_points_->stack.size() == 0) { + return Status::NotFound(); + } + + // Pop the most recent savepoint off the stack + SavePoint savepoint = save_points_->stack.top(); + save_points_->stack.pop(); + + assert(savepoint.size <= rep_.size()); + + if (savepoint.size == rep_.size()) { + // No changes to rollback + } else if (savepoint.size == 0) { + // Rollback everything + Clear(); + } else { + rep_.resize(savepoint.size); + WriteBatchInternal::SetCount(this, savepoint.count); + } + + return Status::OK(); +} + namespace { // This class can *only* be used from a single-threaded write thread, because it // calls ColumnFamilyMemTablesImpl::Seek() diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 18f106776..0718057e8 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -92,6 +92,10 @@ class WriteBatchInternal { // this batch. static void SetSequence(WriteBatch* batch, SequenceNumber seq); + // Returns the offset of the first entry in the batch. + // This offset is only valid if the batch is not empty. + static size_t GetFirstOffset(WriteBatch* batch); + static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 2ba43d2d3..077a54fb2 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -446,6 +446,111 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) { } #endif // !ROCKSDB_LITE +TEST_F(WriteBatchTest, SavePointTest) { + Status s; + WriteBatch batch; + batch.SetSavePoint(); + + batch.Put("A", "a"); + batch.Put("B", "b"); + batch.SetSavePoint(); + + batch.Put("C", "c"); + batch.Delete("A"); + batch.SetSavePoint(); + batch.SetSavePoint(); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Delete(A)@3" + "Put(A, a)@0" + "Put(B, b)@1" + "Put(C, c)@2", + PrintContents(&batch)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Put(A, a)@0" + "Put(B, b)@1", + PrintContents(&batch)); + + batch.Delete("A"); + batch.Put("B", "bb"); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("", PrintContents(&batch)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("", PrintContents(&batch)); + + batch.Put("D", "d"); + batch.Delete("A"); + + batch.SetSavePoint(); + + batch.Put("A", "aaa"); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Delete(A)@1" + "Put(D, d)@0", + PrintContents(&batch)); + + batch.SetSavePoint(); + + batch.Put("D", "d"); + batch.Delete("A"); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "Delete(A)@1" + "Put(D, d)@0", + PrintContents(&batch)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ( + "Delete(A)@1" + "Put(D, d)@0", + PrintContents(&batch)); + + WriteBatch batch2; + + s = batch2.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("", PrintContents(&batch2)); + + batch2.Delete("A"); + batch2.SetSavePoint(); + + s = batch2.RollbackToSavePoint(); + ASSERT_OK(s); + ASSERT_EQ("Delete(A)@0", PrintContents(&batch2)); + + batch2.Clear(); + ASSERT_EQ("", PrintContents(&batch2)); + + batch2.SetSavePoint(); + + batch2.Delete("B"); + ASSERT_EQ("Delete(B)@0", PrintContents(&batch2)); + + batch2.SetSavePoint(); + s = batch2.RollbackToSavePoint(); + ASSERT_OK(s); + ASSERT_EQ("Delete(B)@0", PrintContents(&batch2)); + + s = batch2.RollbackToSavePoint(); + ASSERT_OK(s); + ASSERT_EQ("", PrintContents(&batch2)); + + s = batch2.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("", PrintContents(&batch2)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 22e7253c1..c1d27c17e 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -157,6 +157,22 @@ class WriteBatchWithIndex : public WriteBatchBase { ColumnFamilyHandle* column_family, const Slice& key, std::string* value); + // Records the state of the batch for future calls to RollbackToSavePoint(). + // May be called multiple times to set multiple save points. + void SetSavePoint() override; + + // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the + // most recent call to SetSavePoint() and removes the most recent save point. + // If there is no previous call to SetSavePoint(), behaves the same as + // Clear(). + // + // Calling RollbackToSavePoint invalidates any open iterators on this batch. + // + // Returns Status::OK() on success, + // Status::NotFound() if no previous call to SetSavePoint(), + // or other Status on corruption. + Status RollbackToSavePoint() override; + private: struct Rep; Rep* rep; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index d76c96f7f..7fb4b6e52 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -25,6 +25,7 @@ #ifndef STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ #define STORAGE_ROCKSDB_INCLUDE_WRITE_BATCH_H_ +#include #include #include #include "rocksdb/status.h" @@ -34,6 +35,7 @@ namespace rocksdb { class Slice; class ColumnFamilyHandle; +struct SavePoints; struct SliceParts; class WriteBatch : public WriteBatchBase { @@ -101,6 +103,17 @@ class WriteBatch : public WriteBatchBase { // Clear all updates buffered in this batch. void Clear() override; + // Records the state of the batch for future calls to RollbackToSavePoint(). + // May be called multiple times to set multiple save points. + void SetSavePoint() override; + + // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the + // most recent call to SetSavePoint() and removes the most recent save point. + // If there is no previous call to SetSavePoint(), Status::NotFound() + // will be returned. + // Oterwise returns Status::OK(). + Status RollbackToSavePoint() override; + // Support for iterating over the contents of a batch. class Handler { public: @@ -168,10 +181,11 @@ class WriteBatch : public WriteBatchBase { WriteBatch* GetWriteBatch() override { return this; } // Constructor with a serialized string object - explicit WriteBatch(std::string rep): rep_(rep) {} + explicit WriteBatch(std::string rep) : save_points_(nullptr), rep_(rep) {} private: friend class WriteBatchInternal; + SavePoints* save_points_; protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index cfe3ceb48..d64ffc200 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -11,6 +11,7 @@ namespace rocksdb { class Slice; +class Status; class ColumnFamilyHandle; class WriteBatch; struct SliceParts; @@ -72,6 +73,16 @@ class WriteBatchBase { // converting any WriteBatchBase(eg WriteBatchWithIndex) into a basic // WriteBatch. virtual WriteBatch* GetWriteBatch() = 0; + + // Records the state of the batch for future calls to RollbackToSavePoint(). + // May be called multiple times to set multiple save points. + virtual void SetSavePoint() = 0; + + // Remove all entries in this batch (Put, Merge, Delete, PutLogData) since the + // most recent call to SetSavePoint() and removes the most recent save point. + // If there is no previous call to SetSavePoint(), behaves the same as + // Clear(). + virtual Status RollbackToSavePoint() = 0; }; } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index be8d93ccf..507aff248 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -400,6 +400,11 @@ struct WriteBatchWithIndex::Rep { // Clear all updates buffered in this batch. void Clear(); + void ClearIndex(); + + // Rebuild index by reading all records from the batch. + // Returns non-ok status on corruption. + Status ReBuildIndex(); }; bool WriteBatchWithIndex::Rep::UpdateExistingEntry( @@ -455,13 +460,73 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { void WriteBatchWithIndex::Rep::Clear() { write_batch.Clear(); + ClearIndex(); + } + + void WriteBatchWithIndex::Rep::ClearIndex() { + skip_list.~WriteBatchEntrySkipList(); arena.~Arena(); new (&arena) Arena(); - skip_list.~WriteBatchEntrySkipList(); new (&skip_list) WriteBatchEntrySkipList(comparator, &arena); last_entry_offset = 0; } + Status WriteBatchWithIndex::Rep::ReBuildIndex() { + Status s; + + ClearIndex(); + + if (write_batch.Count() == 0) { + // Nothing to re-index + return s; + } + + size_t offset = WriteBatchInternal::GetFirstOffset(&write_batch); + + Slice input(write_batch.Data()); + input.remove_prefix(offset); + + // Loop through all entries in Rep and add each one to the index + int found = 0; + while (s.ok() && !input.empty()) { + Slice key, value, blob; + uint32_t column_family_id = 0; // default + char tag = 0; + + // set offset of current entry for call to AddNewEntry() + last_entry_offset = input.data() - write_batch.Data().data(); + + s = ReadRecordFromWriteBatch(&input, &tag, &column_family_id, &key, + &value, &blob); + if (!s.ok()) { + break; + } + + switch (tag) { + case kTypeColumnFamilyValue: + case kTypeValue: + case kTypeColumnFamilyDeletion: + case kTypeDeletion: + case kTypeColumnFamilyMerge: + case kTypeMerge: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key)) { + AddNewEntry(column_family_id); + } + break; + case kTypeLogData: + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + + if (s.ok() && found != write_batch.Count()) { + s = Status::Corruption("WriteBatch has wrong count"); + } + + return s; + } WriteBatchWithIndex::WriteBatchWithIndex( const Comparator* default_index_comparator, size_t reserved_bytes, @@ -640,5 +705,17 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, return s; } +void WriteBatchWithIndex::SetSavePoint() { rep->write_batch.SetSavePoint(); } + +Status WriteBatchWithIndex::RollbackToSavePoint() { + Status s = rep->write_batch.RollbackToSavePoint(); + + if (s.ok()) { + s = rep->ReBuildIndex(); + } + + return s; +} + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index 2c0033ce1..f5c141121 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -30,7 +30,12 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, return Status::InvalidArgument("Output parameters cannot be null"); } - if (data_offset >= GetDataSize()) { + if (data_offset == GetDataSize()) { + // reached end of batch. + return Status::NotFound(); + } + + if (data_offset > GetDataSize()) { return Status::InvalidArgument("data offset exceed write batch size"); } Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 70337f885..3e509ca93 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -1364,6 +1364,150 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { } } } + +static std::string PrintContents(WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family) { + std::string result; + + WBWIIterator* iter; + if (column_family == nullptr) { + iter = batch->NewIterator(); + } else { + iter = batch->NewIterator(column_family); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + WriteEntry e = iter->Entry(); + + if (e.type == kPutRecord) { + result.append("PUT("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kMergeRecord) { + result.append("MERGE("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else { + assert(e.type == kDeleteRecord); + result.append("DEL("); + result.append(e.key.ToString()); + result.append(")"); + } + + result.append(","); + iter->Next(); + } + + delete iter; + return result; +} + +TEST_F(WriteBatchWithIndexTest, SavePointTest) { + WriteBatchWithIndex batch; + ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + Status s; + + batch.Put("A", "a"); + batch.Put("B", "b"); + batch.Put("A", "aa"); + batch.Put(&cf1, "A", "a1"); + batch.Delete(&cf1, "B"); + batch.Put(&cf1, "C", "c1"); + + batch.SetSavePoint(); + + batch.Put("C", "cc"); + batch.Put("B", "bb"); + batch.Delete("A"); + batch.Put(&cf1, "B", "b1"); + batch.Delete(&cf1, "A"); + batch.SetSavePoint(); + + batch.Put("A", "aaa"); + batch.Put("A", "xxx"); + batch.Delete("B"); + batch.Put(&cf1, "B", "b2"); + batch.Delete(&cf1, "C"); + batch.SetSavePoint(); + batch.SetSavePoint(); + batch.Delete("D"); + batch.Delete(&cf1, "D"); + + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,DEL(D),", + PrintContents(&batch, nullptr)); + + ASSERT_EQ( + "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," + "DEL(D),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ( + "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" + "B)" + ",PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C),", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,", + PrintContents(&batch, &cf1)); + + batch.SetSavePoint(); + batch.Put("X", "x"); + + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,", + PrintContents(&batch, nullptr)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", + PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1,", + PrintContents(&batch, &cf1)); + + ASSERT_OK(batch.RollbackToSavePoint()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,", PrintContents(&batch, &cf1)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + + ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,", PrintContents(&batch, &cf1)); + + batch.SetSavePoint(); + + batch.Clear(); + ASSERT_EQ("", PrintContents(&batch, nullptr)); + ASSERT_EQ("", PrintContents(&batch, &cf1)); + + s = batch.RollbackToSavePoint(); + ASSERT_TRUE(s.IsNotFound()); +} + } // namespace int main(int argc, char** argv) {