// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/perf_context.h" #include "rocksdb/utilities/debug.h" #include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_builder.h" #include "test_util/fault_injection_test_env.h" #if !defined(ROCKSDB_LITE) #include "test_util/sync_point.h" #endif namespace ROCKSDB_NAMESPACE { class DBBasicTest : public DBTestBase { public: DBBasicTest() : DBTestBase("/db_basic_test") {} }; TEST_F(DBBasicTest, OpenWhenOpen) { Options options = CurrentOptions(); options.env = env_; ROCKSDB_NAMESPACE::DB* db2 = nullptr; ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2); ASSERT_EQ(Status::Code::kIOError, s.code()); ASSERT_EQ(Status::SubCode::kNone, s.subcode()); ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr); delete db2; } #ifndef ROCKSDB_LITE TEST_F(DBBasicTest, ReadOnlyDB) { ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("bar", "v2")); ASSERT_OK(Put("foo", "v3")); Close(); auto options = CurrentOptions(); assert(options.env == env_); ASSERT_OK(ReadOnlyReopen(options)); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v2", Get("bar")); Iterator* iter = db_->NewIterator(ReadOptions()); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); ++count; } ASSERT_EQ(count, 2); delete iter; Close(); // Reopen and flush memtable. Reopen(options); Flush(); Close(); // Now check keys in read only mode. ASSERT_OK(ReadOnlyReopen(options)); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v2", Get("bar")); ASSERT_TRUE(db_->SyncWAL().IsNotSupported()); } TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) { ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("bar", "v2")); ASSERT_OK(Put("foo", "v3")); Close(); auto options = CurrentOptions(); options.write_dbid_to_manifest = true; assert(options.env == env_); ASSERT_OK(ReadOnlyReopen(options)); std::string db_id1; db_->GetDbIdentity(db_id1); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v2", Get("bar")); Iterator* iter = db_->NewIterator(ReadOptions()); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(iter->status()); ++count; } ASSERT_EQ(count, 2); delete iter; Close(); // Reopen and flush memtable. Reopen(options); Flush(); Close(); // Now check keys in read only mode. ASSERT_OK(ReadOnlyReopen(options)); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v2", Get("bar")); ASSERT_TRUE(db_->SyncWAL().IsNotSupported()); std::string db_id2; db_->GetDbIdentity(db_id2); ASSERT_EQ(db_id1, db_id2); } TEST_F(DBBasicTest, CompactedDB) { const uint64_t kFileSize = 1 << 20; Options options = CurrentOptions(); options.disable_auto_compactions = true; options.write_buffer_size = kFileSize; options.target_file_size_base = kFileSize; options.max_bytes_for_level_base = 1 << 30; options.compression = kNoCompression; Reopen(options); // 1 L0 file, use CompactedDB if max_open_files = -1 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1'))); Flush(); Close(); ASSERT_OK(ReadOnlyReopen(options)); Status s = Put("new", "value"); ASSERT_EQ(s.ToString(), "Not implemented: Not supported operation in read only mode."); ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa")); Close(); options.max_open_files = -1; ASSERT_OK(ReadOnlyReopen(options)); s = Put("new", "value"); ASSERT_EQ(s.ToString(), "Not implemented: Not supported in compacted db mode."); ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa")); Close(); Reopen(options); // Add more L0 files ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2'))); Flush(); ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a'))); Flush(); ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b'))); ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e'))); Flush(); Close(); ASSERT_OK(ReadOnlyReopen(options)); // Fallback to read-only DB s = Put("new", "value"); ASSERT_EQ(s.ToString(), "Not implemented: Not supported operation in read only mode."); Close(); // Full compaction Reopen(options); // Add more keys ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f'))); ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h'))); ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i'))); ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j'))); db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); ASSERT_EQ(3, NumTableFilesAtLevel(1)); Close(); // CompactedDB ASSERT_OK(ReadOnlyReopen(options)); s = Put("new", "value"); ASSERT_EQ(s.ToString(), "Not implemented: Not supported in compacted db mode."); ASSERT_EQ("NOT_FOUND", Get("abc")); ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa")); ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb")); ASSERT_EQ("NOT_FOUND", Get("ccc")); ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee")); ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff")); ASSERT_EQ("NOT_FOUND", Get("ggg")); ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh")); ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii")); ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj")); ASSERT_EQ("NOT_FOUND", Get("kkk")); // MultiGet std::vector values; std::vector status_list = dbfull()->MultiGet( ReadOptions(), std::vector({Slice("aaa"), Slice("ccc"), Slice("eee"), Slice("ggg"), Slice("iii"), Slice("kkk")}), &values); ASSERT_EQ(status_list.size(), static_cast(6)); ASSERT_EQ(values.size(), static_cast(6)); ASSERT_OK(status_list[0]); ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]); ASSERT_TRUE(status_list[1].IsNotFound()); ASSERT_OK(status_list[2]); ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]); ASSERT_TRUE(status_list[3].IsNotFound()); ASSERT_OK(status_list[4]); ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]); ASSERT_TRUE(status_list[5].IsNotFound()); Reopen(options); // Add a key ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f'))); Close(); ASSERT_OK(ReadOnlyReopen(options)); s = Put("new", "value"); ASSERT_EQ(s.ToString(), "Not implemented: Not supported operation in read only mode."); } TEST_F(DBBasicTest, LevelLimitReopen) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu"}, options); const std::string value(1024 * 1024, ' '); int i = 0; while (NumTableFilesAtLevel(2, 1) == 0) { ASSERT_OK(Put(1, Key(i++), value)); dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } options.num_levels = 1; options.max_bytes_for_level_multiplier_additional.resize(1, 1); Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_EQ(s.IsInvalidArgument(), true); ASSERT_EQ(s.ToString(), "Invalid argument: db has more levels than options.num_levels"); options.num_levels = 10; options.max_bytes_for_level_multiplier_additional.resize(10, 1); ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); } #endif // ROCKSDB_LITE TEST_F(DBBasicTest, PutDeleteGet) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_OK(Put(1, "foo", "v2")); ASSERT_EQ("v2", Get(1, "foo")); ASSERT_OK(Delete(1, "foo")); ASSERT_EQ("NOT_FOUND", Get(1, "foo")); } while (ChangeOptions()); } TEST_F(DBBasicTest, PutSingleDeleteGet) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_OK(Put(1, "foo2", "v2")); ASSERT_EQ("v2", Get(1, "foo2")); ASSERT_OK(SingleDelete(1, "foo")); ASSERT_EQ("NOT_FOUND", Get(1, "foo")); // Ski FIFO and universal compaction because they do not apply to the test // case. Skip MergePut because single delete does not get removed when it // encounters a merge. } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction | kSkipMergePut)); } TEST_F(DBBasicTest, EmptyFlush) { // It is possible to produce empty flushes when using single deletes. Tests // whether empty flushes cause issues. do { Random rnd(301); Options options = CurrentOptions(); options.disable_auto_compactions = true; CreateAndReopenWithCF({"pikachu"}, options); Put(1, "a", Slice()); SingleDelete(1, "a"); ASSERT_OK(Flush(1)); ASSERT_EQ("[ ]", AllEntriesFor("a", 1)); // Skip FIFO and universal compaction as they do not apply to the test // case. Skip MergePut because merges cannot be combined with single // deletions. } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction | kSkipMergePut)); } TEST_F(DBBasicTest, GetFromVersions) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(Flush(1)); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("NOT_FOUND", Get(0, "foo")); } while (ChangeOptions()); } #ifndef ROCKSDB_LITE TEST_F(DBBasicTest, GetSnapshot) { anon::OptionsOverride options_override; options_override.skip_policy = kSkipNoSnapshot; do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override)); // Try with both a short key and a long key for (int i = 0; i < 2; i++) { std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); ASSERT_OK(Put(1, key, "v1")); const Snapshot* s1 = db_->GetSnapshot(); ASSERT_OK(Put(1, key, "v2")); ASSERT_EQ("v2", Get(1, key)); ASSERT_EQ("v1", Get(1, key, s1)); ASSERT_OK(Flush(1)); ASSERT_EQ("v2", Get(1, key)); ASSERT_EQ("v1", Get(1, key, s1)); db_->ReleaseSnapshot(s1); } } while (ChangeOptions()); } #endif // ROCKSDB_LITE TEST_F(DBBasicTest, CheckLock) { do { DB* localdb; Options options = CurrentOptions(); ASSERT_OK(TryReopen(options)); // second open should fail Status s = DB::Open(options, dbname_, &localdb); ASSERT_NOK(s); #ifdef OS_LINUX ASSERT_TRUE(s.ToString().find("lock hold by current process") != std::string::npos); #endif // OS_LINUX } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, FlushMultipleMemtable) { do { Options options = CurrentOptions(); WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; options.max_write_buffer_number = 4; options.min_write_buffer_number_to_merge = 3; options.max_write_buffer_size_to_maintain = -1; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(Flush(1)); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "bar")); ASSERT_OK(Flush(1)); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, FlushEmptyColumnFamily) { // Block flush thread and disable compaction thread env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); test::SleepingBackgroundTask sleeping_task_low; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); test::SleepingBackgroundTask sleeping_task_high; env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, Env::Priority::HIGH); Options options = CurrentOptions(); // disable compaction options.disable_auto_compactions = true; WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; options.max_write_buffer_number = 2; options.min_write_buffer_number_to_merge = 1; options.max_write_buffer_size_to_maintain = static_cast(options.write_buffer_size); CreateAndReopenWithCF({"pikachu"}, options); // Compaction can still go through even if no thread can flush the // mem table. ASSERT_OK(Flush(0)); ASSERT_OK(Flush(1)); // Insert can go through ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); ASSERT_EQ("v1", Get(0, "foo")); ASSERT_EQ("v1", Get(1, "bar")); sleeping_task_high.WakeUp(); sleeping_task_high.WaitUntilDone(); // Flush can still go through. ASSERT_OK(Flush(0)); ASSERT_OK(Flush(1)); sleeping_task_low.WakeUp(); sleeping_task_low.WaitUntilDone(); } TEST_F(DBBasicTest, Flush) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; SetPerfLevel(kEnableTime); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); // this will now also flush the last 2 writes ASSERT_OK(Flush(1)); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); get_perf_context()->Reset(); Get(1, "foo"); ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0); ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "bar")); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); ASSERT_OK(Flush(1)); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v2", Get(1, "bar")); get_perf_context()->Reset(); ASSERT_EQ("v2", Get(1, "foo")); ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); ASSERT_OK(Flush(1)); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); // 'foo' should be there because its put // has WAL enabled. ASSERT_EQ("v3", Get(1, "foo")); ASSERT_EQ("v3", Get(1, "bar")); SetPerfLevel(kDisable); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, ManifestRollOver) { do { Options options; options.max_manifest_file_size = 10; // 10 bytes options = CurrentOptions(options); CreateAndReopenWithCF({"pikachu"}, options); { ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1'))); ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2'))); ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3'))); uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo(); ASSERT_OK(Flush(1)); // This should trigger LogAndApply. uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo(); ASSERT_GT(manifest_after_flush, manifest_before_flush); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush); // check if a new manifest file got inserted or not. ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1")); ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2")); ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3")); } } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, IdentityAcrossRestarts1) { do { std::string id1; ASSERT_OK(db_->GetDbIdentity(id1)); Options options = CurrentOptions(); Reopen(options); std::string id2; ASSERT_OK(db_->GetDbIdentity(id2)); // id1 should match id2 because identity was not regenerated ASSERT_EQ(id1.compare(id2), 0); std::string idfilename = IdentityFileName(dbname_); ASSERT_OK(env_->DeleteFile(idfilename)); Reopen(options); std::string id3; ASSERT_OK(db_->GetDbIdentity(id3)); if (options.write_dbid_to_manifest) { ASSERT_EQ(id1.compare(id3), 0); } else { // id1 should NOT match id3 because identity was regenerated ASSERT_NE(id1.compare(id3), 0); } } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, IdentityAcrossRestarts2) { do { std::string id1; ASSERT_OK(db_->GetDbIdentity(id1)); Options options = CurrentOptions(); options.write_dbid_to_manifest = true; Reopen(options); std::string id2; ASSERT_OK(db_->GetDbIdentity(id2)); // id1 should match id2 because identity was not regenerated ASSERT_EQ(id1.compare(id2), 0); std::string idfilename = IdentityFileName(dbname_); ASSERT_OK(env_->DeleteFile(idfilename)); Reopen(options); std::string id3; ASSERT_OK(db_->GetDbIdentity(id3)); // id1 should NOT match id3 because identity was regenerated ASSERT_EQ(id1, id3); } while (ChangeCompactOptions()); } #ifndef ROCKSDB_LITE TEST_F(DBBasicTest, Snapshot) { anon::OptionsOverride options_override; options_override.skip_policy = kSkipNoSnapshot; do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override)); Put(0, "foo", "0v1"); Put(1, "foo", "1v1"); const Snapshot* s1 = db_->GetSnapshot(); ASSERT_EQ(1U, GetNumSnapshots()); uint64_t time_snap1 = GetTimeOldestSnapshots(); ASSERT_GT(time_snap1, 0U); ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber()); Put(0, "foo", "0v2"); Put(1, "foo", "1v2"); env_->addon_time_.fetch_add(1); const Snapshot* s2 = db_->GetSnapshot(); ASSERT_EQ(2U, GetNumSnapshots()); ASSERT_EQ(time_snap1, GetTimeOldestSnapshots()); ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber()); Put(0, "foo", "0v3"); Put(1, "foo", "1v3"); { ManagedSnapshot s3(db_); ASSERT_EQ(3U, GetNumSnapshots()); ASSERT_EQ(time_snap1, GetTimeOldestSnapshots()); ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber()); Put(0, "foo", "0v4"); Put(1, "foo", "1v4"); ASSERT_EQ("0v1", Get(0, "foo", s1)); ASSERT_EQ("1v1", Get(1, "foo", s1)); ASSERT_EQ("0v2", Get(0, "foo", s2)); ASSERT_EQ("1v2", Get(1, "foo", s2)); ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot())); ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot())); ASSERT_EQ("0v4", Get(0, "foo")); ASSERT_EQ("1v4", Get(1, "foo")); } ASSERT_EQ(2U, GetNumSnapshots()); ASSERT_EQ(time_snap1, GetTimeOldestSnapshots()); ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber()); ASSERT_EQ("0v1", Get(0, "foo", s1)); ASSERT_EQ("1v1", Get(1, "foo", s1)); ASSERT_EQ("0v2", Get(0, "foo", s2)); ASSERT_EQ("1v2", Get(1, "foo", s2)); ASSERT_EQ("0v4", Get(0, "foo")); ASSERT_EQ("1v4", Get(1, "foo")); db_->ReleaseSnapshot(s1); ASSERT_EQ("0v2", Get(0, "foo", s2)); ASSERT_EQ("1v2", Get(1, "foo", s2)); ASSERT_EQ("0v4", Get(0, "foo")); ASSERT_EQ("1v4", Get(1, "foo")); ASSERT_EQ(1U, GetNumSnapshots()); ASSERT_LT(time_snap1, GetTimeOldestSnapshots()); ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber()); db_->ReleaseSnapshot(s2); ASSERT_EQ(0U, GetNumSnapshots()); ASSERT_EQ(GetSequenceOldestSnapshots(), 0); ASSERT_EQ("0v4", Get(0, "foo")); ASSERT_EQ("1v4", Get(1, "foo")); } while (ChangeOptions()); } #endif // ROCKSDB_LITE TEST_F(DBBasicTest, CompactBetweenSnapshots) { anon::OptionsOverride options_override; options_override.skip_policy = kSkipNoSnapshot; do { Options options = CurrentOptions(options_override); options.disable_auto_compactions = true; CreateAndReopenWithCF({"pikachu"}, options); Random rnd(301); FillLevels("a", "z", 1); Put(1, "foo", "first"); const Snapshot* snapshot1 = db_->GetSnapshot(); Put(1, "foo", "second"); Put(1, "foo", "third"); Put(1, "foo", "fourth"); const Snapshot* snapshot2 = db_->GetSnapshot(); Put(1, "foo", "fifth"); Put(1, "foo", "sixth"); // All entries (including duplicates) exist // before any compaction or flush is triggered. ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fifth, fourth, third, second, first ]"); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); ASSERT_EQ("first", Get(1, "foo", snapshot1)); // After a flush, "second", "third" and "fifth" should // be removed ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]"); // after we release the snapshot1, only two values left db_->ReleaseSnapshot(snapshot1); FillLevels("a", "z", 1); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); // We have only one valid snapshot snapshot2. Since snapshot1 is // not valid anymore, "first" should be removed by a compaction. ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]"); // after we release the snapshot2, only one value should be left db_->ReleaseSnapshot(snapshot2); FillLevels("a", "z", 1); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); } while (ChangeOptions(kSkipFIFOCompaction)); } TEST_F(DBBasicTest, DBOpen_Options) { Options options = CurrentOptions(); Close(); Destroy(options); // Does not exist, and create_if_missing == false: error DB* db = nullptr; options.create_if_missing = false; Status s = DB::Open(options, dbname_, &db); ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr); ASSERT_TRUE(db == nullptr); // Does not exist, and create_if_missing == true: OK options.create_if_missing = true; s = DB::Open(options, dbname_, &db); ASSERT_OK(s); ASSERT_TRUE(db != nullptr); delete db; db = nullptr; // Does exist, and error_if_exists == true: error options.create_if_missing = false; options.error_if_exists = true; s = DB::Open(options, dbname_, &db); ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr); ASSERT_TRUE(db == nullptr); // Does exist, and error_if_exists == false: OK options.create_if_missing = true; options.error_if_exists = false; s = DB::Open(options, dbname_, &db); ASSERT_OK(s); ASSERT_TRUE(db != nullptr); delete db; db = nullptr; } TEST_F(DBBasicTest, CompactOnFlush) { anon::OptionsOverride options_override; options_override.skip_policy = kSkipNoSnapshot; do { Options options = CurrentOptions(options_override); options.disable_auto_compactions = true; CreateAndReopenWithCF({"pikachu"}, options); Put(1, "foo", "v1"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]"); // Write two new keys Put(1, "a", "begin"); Put(1, "z", "end"); Flush(1); // Case1: Delete followed by a put Delete(1, "foo"); Put(1, "foo", "v2"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]"); // After the current memtable is flushed, the DEL should // have been removed ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]"); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]"); // Case 2: Delete followed by another delete Delete(1, "foo"); Delete(1, "foo"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]"); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 3: Put followed by a delete Put(1, "foo", "v3"); Delete(1, "foo"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]"); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 4: Put followed by another Put Put(1, "foo", "v4"); Put(1, "foo", "v5"); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]"); // clear database Delete(1, "foo"); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 5: Put followed by snapshot followed by another Put // Both puts should remain. Put(1, "foo", "v6"); const Snapshot* snapshot = db_->GetSnapshot(); Put(1, "foo", "v7"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]"); db_->ReleaseSnapshot(snapshot); // clear database Delete(1, "foo"); dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]"); // Case 5: snapshot followed by a put followed by another Put // Only the last put should remain. const Snapshot* snapshot1 = db_->GetSnapshot(); Put(1, "foo", "v8"); Put(1, "foo", "v9"); ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]"); db_->ReleaseSnapshot(snapshot1); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, FlushOneColumnFamily) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, options); ASSERT_OK(Put(0, "Default", "Default")); ASSERT_OK(Put(1, "pikachu", "pikachu")); ASSERT_OK(Put(2, "ilya", "ilya")); ASSERT_OK(Put(3, "muromec", "muromec")); ASSERT_OK(Put(4, "dobrynia", "dobrynia")); ASSERT_OK(Put(5, "nikitich", "nikitich")); ASSERT_OK(Put(6, "alyosha", "alyosha")); ASSERT_OK(Put(7, "popovich", "popovich")); for (int i = 0; i < 8; ++i) { Flush(i); auto tables = ListTableFiles(env_, dbname_); ASSERT_EQ(tables.size(), i + 1U); } } TEST_F(DBBasicTest, MultiGetSimple) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); ASSERT_OK(Put(1, "k1", "v1")); ASSERT_OK(Put(1, "k2", "v2")); ASSERT_OK(Put(1, "k3", "v3")); ASSERT_OK(Put(1, "k4", "v4")); ASSERT_OK(Delete(1, "k4")); ASSERT_OK(Put(1, "k5", "v5")); ASSERT_OK(Delete(1, "no_key")); std::vector keys({"k1", "k2", "k3", "k4", "k5", "no_key"}); std::vector values(20, "Temporary data to be overwritten"); std::vector cfs(keys.size(), handles_[1]); get_perf_context()->Reset(); std::vector s = db_->MultiGet(ReadOptions(), cfs, keys, &values); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(values[0], "v1"); ASSERT_EQ(values[1], "v2"); ASSERT_EQ(values[2], "v3"); ASSERT_EQ(values[4], "v5"); // four kv pairs * two bytes per value ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes); ASSERT_OK(s[0]); ASSERT_OK(s[1]); ASSERT_OK(s[2]); ASSERT_TRUE(s[3].IsNotFound()); ASSERT_OK(s[4]); ASSERT_TRUE(s[5].IsNotFound()); SetPerfLevel(kDisable); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, MultiGetEmpty) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); // Empty Key Set std::vector keys; std::vector values; std::vector cfs; std::vector s = db_->MultiGet(ReadOptions(), cfs, keys, &values); ASSERT_EQ(s.size(), 0U); // Empty Database, Empty Key Set Options options = CurrentOptions(); options.create_if_missing = true; DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); s = db_->MultiGet(ReadOptions(), cfs, keys, &values); ASSERT_EQ(s.size(), 0U); // Empty Database, Search for Keys keys.resize(2); keys[0] = "a"; keys[1] = "b"; cfs.push_back(handles_[0]); cfs.push_back(handles_[1]); s = db_->MultiGet(ReadOptions(), cfs, keys, &values); ASSERT_EQ(static_cast(s.size()), 2); ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound()); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, ChecksumTest) { BlockBasedTableOptions table_options; Options options = CurrentOptions(); // change when new checksum type added int max_checksum = static_cast(kxxHash64); const int kNumPerFile = 2; // generate one table with each type of checksum for (int i = 0; i <= max_checksum; ++i) { table_options.checksum = static_cast(i); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Reopen(options); for (int j = 0; j < kNumPerFile; ++j) { ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j))); } ASSERT_OK(Flush()); } // with each valid checksum type setting... for (int i = 0; i <= max_checksum; ++i) { table_options.checksum = static_cast(i); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); Reopen(options); // verify every type of checksum (should be regardless of that setting) for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) { ASSERT_EQ(Key(j), Get(Key(j))); } } } // On Windows you can have either memory mapped file or a file // with unbuffered access. So this asserts and does not make // sense to run #ifndef OS_WIN TEST_F(DBBasicTest, MmapAndBufferOptions) { if (!IsMemoryMappedAccessSupported()) { return; } Options options = CurrentOptions(); options.use_direct_reads = true; options.allow_mmap_reads = true; ASSERT_NOK(TryReopen(options)); // All other combinations are acceptable options.use_direct_reads = false; ASSERT_OK(TryReopen(options)); if (IsDirectIOSupported()) { options.use_direct_reads = true; options.allow_mmap_reads = false; ASSERT_OK(TryReopen(options)); } options.use_direct_reads = false; ASSERT_OK(TryReopen(options)); } #endif class TestEnv : public EnvWrapper { public: explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {} class TestLogger : public Logger { public: using Logger::Logv; explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } ~TestLogger() override { if (!closed_) { CloseHelper(); } } void Logv(const char* /*format*/, va_list /*ap*/) override {} protected: Status CloseImpl() override { return CloseHelper(); } private: Status CloseHelper() { env->CloseCountInc(); ; return Status::IOError(); } TestEnv* env; }; void CloseCountInc() { close_count++; } int GetCloseCount() { return close_count; } Status NewLogger(const std::string& /*fname*/, std::shared_ptr* result) override { result->reset(new TestLogger(this)); return Status::OK(); } private: int close_count; }; TEST_F(DBBasicTest, DBClose) { Options options = GetDefaultOptions(); std::string dbname = test::PerThreadDBPath("db_close_test"); ASSERT_OK(DestroyDB(dbname, options)); DB* db = nullptr; TestEnv* env = new TestEnv(env_); std::unique_ptr local_env_guard(env); options.create_if_missing = true; options.env = env; Status s = DB::Open(options, dbname, &db); ASSERT_OK(s); ASSERT_TRUE(db != nullptr); s = db->Close(); ASSERT_EQ(env->GetCloseCount(), 1); ASSERT_EQ(s, Status::IOError()); delete db; ASSERT_EQ(env->GetCloseCount(), 1); // Do not call DB::Close() and ensure our logger Close() still gets called s = DB::Open(options, dbname, &db); ASSERT_OK(s); ASSERT_TRUE(db != nullptr); delete db; ASSERT_EQ(env->GetCloseCount(), 2); // Provide our own logger and ensure DB::Close() does not close it options.info_log.reset(new TestEnv::TestLogger(env)); options.create_if_missing = false; s = DB::Open(options, dbname, &db); ASSERT_OK(s); ASSERT_TRUE(db != nullptr); s = db->Close(); ASSERT_EQ(s, Status::OK()); delete db; ASSERT_EQ(env->GetCloseCount(), 2); options.info_log.reset(); ASSERT_EQ(env->GetCloseCount(), 3); } TEST_F(DBBasicTest, DBCloseFlushError) { std::unique_ptr fault_injection_env( new FaultInjectionTestEnv(env_)); Options options = GetDefaultOptions(); options.create_if_missing = true; options.manual_wal_flush = true; options.write_buffer_size=100; options.env = fault_injection_env.get(); Reopen(options); ASSERT_OK(Put("key1", "value1")); ASSERT_OK(Put("key2", "value2")); ASSERT_OK(dbfull()->TEST_SwitchMemtable()); ASSERT_OK(Put("key3", "value3")); fault_injection_env->SetFilesystemActive(false); Status s = dbfull()->Close(); fault_injection_env->SetFilesystemActive(true); ASSERT_NE(s, Status::OK()); Destroy(options); } class DBMultiGetTestWithParam : public DBBasicTest, public testing::WithParamInterface {}; TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, options); // tuples std::vector> cf_kv_vec; static const int num_keys = 24; cf_kv_vec.reserve(num_keys); for (int i = 0; i < num_keys; ++i) { int cf = i / 3; int cf_key = 1 % 3; cf_kv_vec.emplace_back(std::make_tuple( cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key), "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key))); ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]), std::get<2>(cf_kv_vec[i]))); } int get_sv_count = 0; ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast(db_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { if (++get_sv_count == 2) { // After MultiGet refs a couple of CFs, flush all CFs so MultiGet // is forced to repeat the process for (int i = 0; i < num_keys; ++i) { int cf = i / 3; int cf_key = i % 8; if (cf_key == 0) { ASSERT_OK(Flush(cf)); } ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]), std::get<2>(cf_kv_vec[i]) + "_2")); } } if (get_sv_count == 11) { for (int i = 0; i < 8; ++i) { auto* cfd = reinterpret_cast( db->GetColumnFamilyHandle(i)) ->cfd(); ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); } } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); std::vector cfs; std::vector keys; std::vector values; for (int i = 0; i < num_keys; ++i) { cfs.push_back(std::get<0>(cf_kv_vec[i])); keys.push_back(std::get<1>(cf_kv_vec[i])); } values = MultiGet(cfs, keys, nullptr, GetParam()); ASSERT_EQ(values.size(), num_keys); for (unsigned int j = 0; j < values.size(); ++j) { ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2"); } keys.clear(); cfs.clear(); cfs.push_back(std::get<0>(cf_kv_vec[0])); keys.push_back(std::get<1>(cf_kv_vec[0])); cfs.push_back(std::get<0>(cf_kv_vec[3])); keys.push_back(std::get<1>(cf_kv_vec[3])); cfs.push_back(std::get<0>(cf_kv_vec[4])); keys.push_back(std::get<1>(cf_kv_vec[4])); values = MultiGet(cfs, keys, nullptr, GetParam()); ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2"); ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2"); ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2"); keys.clear(); cfs.clear(); cfs.push_back(std::get<0>(cf_kv_vec[7])); keys.push_back(std::get<1>(cf_kv_vec[7])); cfs.push_back(std::get<0>(cf_kv_vec[6])); keys.push_back(std::get<1>(cf_kv_vec[6])); cfs.push_back(std::get<0>(cf_kv_vec[1])); keys.push_back(std::get<1>(cf_kv_vec[1])); values = MultiGet(cfs, keys, nullptr, GetParam()); ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2"); ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2"); ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2"); for (int cf = 0; cf < 8; ++cf) { auto* cfd = reinterpret_cast( reinterpret_cast(db_)->GetColumnFamilyHandle(cf)) ->cfd(); ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete); } } TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, options); for (int i = 0; i < 8; ++i) { ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", "cf" + std::to_string(i) + "_val")); } int get_sv_count = 0; int retries = 0; bool last_try = false; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { if (last_try) { return; } if (++get_sv_count == 2) { ++retries; get_sv_count = 0; for (int i = 0; i < 8; ++i) { ASSERT_OK(Flush(i)); ASSERT_OK(Put( i, "cf" + std::to_string(i) + "_key", "cf" + std::to_string(i) + "_val" + std::to_string(retries))); } } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); std::vector cfs; std::vector keys; std::vector values; for (int i = 0; i < 8; ++i) { cfs.push_back(i); keys.push_back("cf" + std::to_string(i) + "_key"); } values = MultiGet(cfs, keys, nullptr, GetParam()); ASSERT_TRUE(last_try); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val" + std::to_string(retries)); } for (int i = 0; i < 8; ++i) { auto* cfd = reinterpret_cast( reinterpret_cast(db_)->GetColumnFamilyHandle(i)) ->cfd(); ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); } } TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, options); for (int i = 0; i < 8; ++i) { ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", "cf" + std::to_string(i) + "_val")); } int get_sv_count = 0; ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast(db_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) { if (++get_sv_count == 2) { for (int i = 0; i < 8; ++i) { ASSERT_OK(Flush(i)); ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key", "cf" + std::to_string(i) + "_val2")); } } if (get_sv_count == 8) { for (int i = 0; i < 8; ++i) { auto* cfd = reinterpret_cast( db->GetColumnFamilyHandle(i)) ->cfd(); ASSERT_TRUE( (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) || (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete)); } } }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); std::vector cfs; std::vector keys; std::vector values; for (int i = 0; i < 8; ++i) { cfs.push_back(i); keys.push_back("cf" + std::to_string(i) + "_key"); } const Snapshot* snapshot = db_->GetSnapshot(); values = MultiGet(cfs, keys, snapshot, GetParam()); db_->ReleaseSnapshot(snapshot); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val"); } for (int i = 0; i < 8; ++i) { auto* cfd = reinterpret_cast( reinterpret_cast(db_)->GetColumnFamilyHandle(i)) ->cfd(); ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse); } } INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, testing::Bool()); TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); ASSERT_OK(Put(1, "k1", "v1")); ASSERT_OK(Put(1, "k2", "v2")); ASSERT_OK(Put(1, "k3", "v3")); ASSERT_OK(Put(1, "k4", "v4")); ASSERT_OK(Delete(1, "k4")); ASSERT_OK(Put(1, "k5", "v5")); ASSERT_OK(Delete(1, "no_key")); get_perf_context()->Reset(); std::vector keys({"no_key", "k5", "k4", "k3", "k2", "k1"}); std::vector values(keys.size()); std::vector cfs(keys.size(), handles_[1]); std::vector s(keys.size()); db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), values.data(), s.data(), false); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1"); ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2"); ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3"); ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5"); // four kv pairs * two bytes per value ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes); ASSERT_TRUE(s[0].IsNotFound()); ASSERT_OK(s[1]); ASSERT_TRUE(s[2].IsNotFound()); ASSERT_OK(s[3]); ASSERT_OK(s[4]); ASSERT_OK(s[5]); SetPerfLevel(kDisable); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); ASSERT_OK(Put(1, "k1", "v1")); ASSERT_OK(Put(1, "k2", "v2")); ASSERT_OK(Put(1, "k3", "v3")); ASSERT_OK(Put(1, "k4", "v4")); ASSERT_OK(Delete(1, "k4")); ASSERT_OK(Put(1, "k5", "v5")); ASSERT_OK(Delete(1, "no_key")); get_perf_context()->Reset(); std::vector keys({"k1", "k2", "k3", "k4", "k5", "no_key"}); std::vector values(keys.size()); std::vector cfs(keys.size(), handles_[1]); std::vector s(keys.size()); db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), values.data(), s.data(), true); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1"); ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2"); ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3"); ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5"); // four kv pairs * two bytes per value ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes); ASSERT_OK(s[0]); ASSERT_OK(s[1]); ASSERT_OK(s[2]); ASSERT_TRUE(s[3].IsNotFound()); ASSERT_OK(s[4]); ASSERT_TRUE(s[5].IsNotFound()); SetPerfLevel(kDisable); } while (ChangeCompactOptions()); } TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { Options options = CurrentOptions(); options.disable_auto_compactions = true; Reopen(options); int num_keys = 0; for (int i = 0; i < 128; ++i) { ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); num_keys++; if (num_keys == 8) { Flush(); num_keys = 0; } } if (num_keys > 0) { Flush(); num_keys = 0; } MoveFilesToLevel(2); for (int i = 0; i < 128; i += 3) { ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); num_keys++; if (num_keys == 8) { Flush(); num_keys = 0; } } if (num_keys > 0) { Flush(); num_keys = 0; } MoveFilesToLevel(1); for (int i = 0; i < 128; i += 5) { ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); num_keys++; if (num_keys == 8) { Flush(); num_keys = 0; } } if (num_keys > 0) { Flush(); num_keys = 0; } ASSERT_EQ(0, num_keys); for (int i = 0; i < 128; i += 9) { ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); } std::vector keys; std::vector values; for (int i = 64; i < 80; ++i) { keys.push_back("key_" + std::to_string(i)); } values = MultiGet(keys, nullptr); ASSERT_EQ(values.size(), 16); for (unsigned int j = 0; j < values.size(); ++j) { int key = j + 64; if (key % 9 == 0) { ASSERT_EQ(values[j], "val_mem_" + std::to_string(key)); } else if (key % 5 == 0) { ASSERT_EQ(values[j], "val_l0_" + std::to_string(key)); } else if (key % 3 == 0) { ASSERT_EQ(values[j], "val_l1_" + std::to_string(key)); } else { ASSERT_EQ(values[j], "val_l2_" + std::to_string(key)); } } } TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { Options options = CurrentOptions(); options.disable_auto_compactions = true; options.merge_operator = MergeOperators::CreateStringAppendOperator(); BlockBasedTableOptions bbto; bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); int num_keys = 0; for (int i = 0; i < 128; ++i) { ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i))); num_keys++; if (num_keys == 8) { Flush(); num_keys = 0; } } if (num_keys > 0) { Flush(); num_keys = 0; } MoveFilesToLevel(2); for (int i = 0; i < 128; i += 3) { ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i))); num_keys++; if (num_keys == 8) { Flush(); num_keys = 0; } } if (num_keys > 0) { Flush(); num_keys = 0; } MoveFilesToLevel(1); for (int i = 0; i < 128; i += 5) { ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i))); num_keys++; if (num_keys == 8) { Flush(); num_keys = 0; } } if (num_keys > 0) { Flush(); num_keys = 0; } ASSERT_EQ(0, num_keys); for (int i = 0; i < 128; i += 9) { ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i))); } std::vector keys; std::vector values; for (int i = 32; i < 80; ++i) { keys.push_back("key_" + std::to_string(i)); } values = MultiGet(keys, nullptr); ASSERT_EQ(values.size(), keys.size()); for (unsigned int j = 0; j < 48; ++j) { int key = j + 32; std::string value; value.append("val_l2_" + std::to_string(key)); if (key % 3 == 0) { value.append(","); value.append("val_l1_" + std::to_string(key)); } if (key % 5 == 0) { value.append(","); value.append("val_l0_" + std::to_string(key)); } if (key % 9 == 0) { value.append(","); value.append("val_mem_" + std::to_string(key)); } ASSERT_EQ(values[j], value); } } // Test class for batched MultiGet with prefix extractor // Param bool - If true, use partitioned filters // If false, use full filter block class MultiGetPrefixExtractorTest : public DBBasicTest, public ::testing::WithParamInterface { }; TEST_P(MultiGetPrefixExtractorTest, Batched) { Options options = CurrentOptions(); options.prefix_extractor.reset(NewFixedPrefixTransform(2)); options.memtable_prefix_bloom_size_ratio = 10; BlockBasedTableOptions bbto; if (GetParam()) { bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; bbto.partition_filters = true; } bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); bbto.whole_key_filtering = false; bbto.cache_index_and_filter_blocks = false; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); Reopen(options); SetPerfLevel(kEnableCount); get_perf_context()->Reset(); // First key is not in the prefix_extractor domain ASSERT_OK(Put("k", "v0")); ASSERT_OK(Put("kk1", "v1")); ASSERT_OK(Put("kk2", "v2")); ASSERT_OK(Put("kk3", "v3")); ASSERT_OK(Put("kk4", "v4")); std::vector mem_keys( {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"}); std::vector inmem_values; inmem_values = MultiGet(mem_keys, nullptr); ASSERT_EQ(inmem_values[0], "v0"); ASSERT_EQ(inmem_values[1], "v1"); ASSERT_EQ(inmem_values[2], "v2"); ASSERT_EQ(inmem_values[3], "v3"); ASSERT_EQ(inmem_values[4], "v4"); ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2); ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5); ASSERT_OK(Flush()); std::vector keys({"k", "kk1", "kk2", "kk3", "kk4"}); std::vector values; get_perf_context()->Reset(); values = MultiGet(keys, nullptr); ASSERT_EQ(values[0], "v0"); ASSERT_EQ(values[1], "v1"); ASSERT_EQ(values[2], "v2"); ASSERT_EQ(values[3], "v3"); ASSERT_EQ(values[4], "v4"); // Filter hits for 4 in-domain keys ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4); } INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest, ::testing::Bool()); #ifndef ROCKSDB_LITE class DBMultiGetRowCacheTest : public DBBasicTest, public ::testing::WithParamInterface {}; TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) { do { option_config_ = kRowCache; Options options = CurrentOptions(); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); CreateAndReopenWithCF({"pikachu"}, options); SetPerfLevel(kEnableCount); ASSERT_OK(Put(1, "k1", "v1")); ASSERT_OK(Put(1, "k2", "v2")); ASSERT_OK(Put(1, "k3", "v3")); ASSERT_OK(Put(1, "k4", "v4")); Flush(1); ASSERT_OK(Put(1, "k5", "v5")); const Snapshot* snap1 = dbfull()->GetSnapshot(); ASSERT_OK(Delete(1, "k4")); Flush(1); const Snapshot* snap2 = dbfull()->GetSnapshot(); get_perf_context()->Reset(); std::vector keys({"no_key", "k5", "k4", "k3", "k1"}); std::vector values(keys.size()); std::vector cfs(keys.size(), handles_[1]); std::vector s(keys.size()); ReadOptions ro; bool use_snapshots = GetParam(); if (use_snapshots) { ro.snapshot = snap2; } db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), s.data(), false); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1"); ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3"); ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5"); // four kv pairs * two bytes per value ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes); ASSERT_TRUE(s[0].IsNotFound()); ASSERT_OK(s[1]); ASSERT_TRUE(s[2].IsNotFound()); ASSERT_OK(s[3]); ASSERT_OK(s[4]); // Call MultiGet() again with some intersection with the previous set of // keys. Those should already be in the row cache. keys.assign({"no_key", "k5", "k3", "k2"}); for (size_t i = 0; i < keys.size(); ++i) { values[i].Reset(); s[i] = Status::OK(); } get_perf_context()->Reset(); if (use_snapshots) { ro.snapshot = snap1; } db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), values.data(), s.data(), false); ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2"); ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3"); ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5"); // four kv pairs * two bytes per value ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes); ASSERT_TRUE(s[0].IsNotFound()); ASSERT_OK(s[1]); ASSERT_OK(s[2]); ASSERT_OK(s[3]); if (use_snapshots) { // Only reads from the first SST file would have been cached, since // snapshot seq no is > fd.largest_seqno ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT)); } else { ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT)); } SetPerfLevel(kDisable); dbfull()->ReleaseSnapshot(snap1); dbfull()->ReleaseSnapshot(snap2); } while (ChangeCompactOptions()); } INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest, testing::Values(true, false)); TEST_F(DBBasicTest, GetAllKeyVersions) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.disable_auto_compactions = true; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_EQ(2, handles_.size()); const size_t kNumInserts = 4; const size_t kNumDeletes = 4; const size_t kNumUpdates = 4; // Check default column family for (size_t i = 0; i != kNumInserts; ++i) { ASSERT_OK(Put(std::to_string(i), "value")); } for (size_t i = 0; i != kNumUpdates; ++i) { ASSERT_OK(Put(std::to_string(i), "value1")); } for (size_t i = 0; i != kNumDeletes; ++i) { ASSERT_OK(Delete(std::to_string(i))); } std::vector key_versions; ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions( db_, Slice(), Slice(), std::numeric_limits::max(), &key_versions)); ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size()); ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions( db_, handles_[0], Slice(), Slice(), std::numeric_limits::max(), &key_versions)); ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size()); // Check non-default column family for (size_t i = 0; i != kNumInserts - 1; ++i) { ASSERT_OK(Put(1, std::to_string(i), "value")); } for (size_t i = 0; i != kNumUpdates - 1; ++i) { ASSERT_OK(Put(1, std::to_string(i), "value1")); } for (size_t i = 0; i != kNumDeletes - 1; ++i) { ASSERT_OK(Delete(1, std::to_string(i))); } ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions( db_, handles_[1], Slice(), Slice(), std::numeric_limits::max(), &key_versions)); ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size()); } #endif // !ROCKSDB_LITE TEST_F(DBBasicTest, MultiGetIOBufferOverrun) { Options options = CurrentOptions(); Random rnd(301); BlockBasedTableOptions table_options; table_options.pin_l0_filter_and_index_blocks_in_cache = true; table_options.block_size = 16 * 1024; assert(table_options.block_size > BlockBasedTable::kMultiGetReadStackBufSize); options.table_factory.reset(new BlockBasedTableFactory(table_options)); Reopen(options); std::string zero_str(128, '\0'); for (int i = 0; i < 100; ++i) { // Make the value compressible. A purely random string doesn't compress // and the resultant data block will not be compressed std::string value(RandomString(&rnd, 128) + zero_str); assert(Put(Key(i), value) == Status::OK()); } Flush(); std::vector key_data(10); std::vector keys; // We cannot resize a PinnableSlice vector, so just set initial size to // largest we think we will need std::vector values(10); std::vector statuses; ReadOptions ro; // Warm up the cache first key_data.emplace_back(Key(0)); keys.emplace_back(Slice(key_data.back())); key_data.emplace_back(Key(50)); keys.emplace_back(Slice(key_data.back())); statuses.resize(keys.size()); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); } class DBBasicTestWithParallelIO : public DBTestBase, public testing::WithParamInterface> { public: DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") { bool compressed_cache = std::get<0>(GetParam()); bool uncompressed_cache = std::get<1>(GetParam()); compression_enabled_ = std::get<2>(GetParam()); fill_cache_ = std::get<3>(GetParam()); if (compressed_cache) { std::shared_ptr cache = NewLRUCache(1048576); compressed_cache_ = std::make_shared(cache); } if (uncompressed_cache) { std::shared_ptr cache = NewLRUCache(1048576); uncompressed_cache_ = std::make_shared(cache); } env_->count_random_reads_ = true; Options options = CurrentOptions(); Random rnd(301); BlockBasedTableOptions table_options; #ifndef ROCKSDB_LITE if (compression_enabled_) { std::vector compression_types; compression_types = GetSupportedCompressions(); // Not every platform may have compression libraries available, so // dynamically pick based on what's available if (compression_types.size() == 0) { compression_enabled_ = false; } else { options.compression = compression_types[0]; } } #else // GetSupportedCompressions() is not available in LITE build if (!Snappy_Supported()) { compression_enabled_ = false; } #endif //ROCKSDB_LITE table_options.block_cache = uncompressed_cache_; if (table_options.block_cache == nullptr) { table_options.no_block_cache = true; } else { table_options.pin_l0_filter_and_index_blocks_in_cache = true; } table_options.block_cache_compressed = compressed_cache_; table_options.flush_block_policy_factory.reset( new MyFlushBlockPolicyFactory()); options.table_factory.reset(new BlockBasedTableFactory(table_options)); if (!compression_enabled_) { options.compression = kNoCompression; } Reopen(options); std::string zero_str(128, '\0'); for (int i = 0; i < 100; ++i) { // Make the value compressible. A purely random string doesn't compress // and the resultant data block will not be compressed values_.emplace_back(RandomString(&rnd, 128) + zero_str); assert(Put(Key(i), values_[i]) == Status::OK()); } Flush(); for (int i = 0; i < 100; ++i) { // block cannot gain space by compression uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0'); std::string tmp_key = "a" + Key(i); assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK()); } Flush(); } bool CheckValue(int i, const std::string& value) { if (values_[i].compare(value) == 0) { return true; } return false; } bool CheckUncompressableValue(int i, const std::string& value) { if (uncompressable_values_[i].compare(value) == 0) { return true; } return false; } int num_lookups() { return uncompressed_cache_->num_lookups(); } int num_found() { return uncompressed_cache_->num_found(); } int num_inserts() { return uncompressed_cache_->num_inserts(); } int num_lookups_compressed() { return compressed_cache_->num_lookups(); } int num_found_compressed() { return compressed_cache_->num_found(); } int num_inserts_compressed() { return compressed_cache_->num_inserts(); } bool fill_cache() { return fill_cache_; } bool compression_enabled() { return compression_enabled_; } bool has_compressed_cache() { return compressed_cache_ != nullptr; } bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; } static void SetUpTestCase() {} static void TearDownTestCase() {} private: class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory { public: MyFlushBlockPolicyFactory() {} virtual const char* Name() const override { return "MyFlushBlockPolicyFactory"; } virtual FlushBlockPolicy* NewFlushBlockPolicy( const BlockBasedTableOptions& /*table_options*/, const BlockBuilder& data_block_builder) const override { return new MyFlushBlockPolicy(data_block_builder); } }; class MyFlushBlockPolicy : public FlushBlockPolicy { public: explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder) : num_keys_(0), data_block_builder_(data_block_builder) {} bool Update(const Slice& /*key*/, const Slice& /*value*/) override { if (data_block_builder_.empty()) { // First key in this block num_keys_ = 1; return false; } // Flush every 10 keys if (num_keys_ == 10) { num_keys_ = 1; return true; } num_keys_++; return false; } private: int num_keys_; const BlockBuilder& data_block_builder_; }; class MyBlockCache : public Cache { public: explicit MyBlockCache(std::shared_ptr& target) : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {} virtual const char* Name() const override { return "MyBlockCache"; } virtual Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle = nullptr, Priority priority = Priority::LOW) override { num_inserts_++; return target_->Insert(key, value, charge, deleter, handle, priority); } virtual Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override { num_lookups_++; Handle* handle = target_->Lookup(key, stats); if (handle != nullptr) { num_found_++; } return handle; } virtual bool Ref(Handle* handle) override { return target_->Ref(handle); } virtual bool Release(Handle* handle, bool force_erase = false) override { return target_->Release(handle, force_erase); } virtual void* Value(Handle* handle) override { return target_->Value(handle); } virtual void Erase(const Slice& key) override { target_->Erase(key); } virtual uint64_t NewId() override { return target_->NewId(); } virtual void SetCapacity(size_t capacity) override { target_->SetCapacity(capacity); } virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override { target_->SetStrictCapacityLimit(strict_capacity_limit); } virtual bool HasStrictCapacityLimit() const override { return target_->HasStrictCapacityLimit(); } virtual size_t GetCapacity() const override { return target_->GetCapacity(); } virtual size_t GetUsage() const override { return target_->GetUsage(); } virtual size_t GetUsage(Handle* handle) const override { return target_->GetUsage(handle); } virtual size_t GetPinnedUsage() const override { return target_->GetPinnedUsage(); } virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; } virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), bool thread_safe) override { return target_->ApplyToAllCacheEntries(callback, thread_safe); } virtual void EraseUnRefEntries() override { return target_->EraseUnRefEntries(); } int num_lookups() { return num_lookups_; } int num_found() { return num_found_; } int num_inserts() { return num_inserts_; } private: std::shared_ptr target_; int num_lookups_; int num_found_; int num_inserts_; }; std::shared_ptr compressed_cache_; std::shared_ptr uncompressed_cache_; bool compression_enabled_; std::vector values_; std::vector uncompressable_values_; bool fill_cache_; }; TEST_P(DBBasicTestWithParallelIO, MultiGet) { std::vector key_data(10); std::vector keys; // We cannot resize a PinnableSlice vector, so just set initial size to // largest we think we will need std::vector values(10); std::vector statuses; ReadOptions ro; ro.fill_cache = fill_cache(); // Warm up the cache first key_data.emplace_back(Key(0)); keys.emplace_back(Slice(key_data.back())); key_data.emplace_back(Key(50)); keys.emplace_back(Slice(key_data.back())); statuses.resize(keys.size()); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); ASSERT_TRUE(CheckValue(0, values[0].ToString())); ASSERT_TRUE(CheckValue(50, values[1].ToString())); int random_reads = env_->random_read_counter_.Read(); key_data[0] = Key(1); key_data[1] = Key(51); keys[0] = Slice(key_data[0]); keys[1] = Slice(key_data[1]); values[0].Reset(); values[1].Reset(); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); ASSERT_TRUE(CheckValue(1, values[0].ToString())); ASSERT_TRUE(CheckValue(51, values[1].ToString())); bool read_from_cache = false; if (fill_cache()) { if (has_uncompressed_cache()) { read_from_cache = true; } else if (has_compressed_cache() && compression_enabled()) { read_from_cache = true; } } int expected_reads = random_reads + (read_from_cache ? 0 : 2); ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); keys.resize(10); statuses.resize(10); std::vector key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85}; for (size_t i = 0; i < key_ints.size(); ++i) { key_data[i] = Key(key_ints[i]); keys[i] = Slice(key_data[i]); statuses[i] = Status::OK(); values[i].Reset(); } dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); for (size_t i = 0; i < key_ints.size(); ++i) { ASSERT_OK(statuses[i]); ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString())); } if (compression_enabled() && !has_compressed_cache()) { expected_reads += (read_from_cache ? 2 : 3); } else { expected_reads += (read_from_cache ? 2 : 4); } ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); keys.resize(10); statuses.resize(10); std::vector key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85}; for (size_t i = 0; i < key_uncmp.size(); ++i) { key_data[i] = "a" + Key(key_uncmp[i]); keys[i] = Slice(key_data[i]); statuses[i] = Status::OK(); values[i].Reset(); } dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); for (size_t i = 0; i < key_uncmp.size(); ++i) { ASSERT_OK(statuses[i]); ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString())); } if (compression_enabled() && !has_compressed_cache()) { expected_reads += (read_from_cache ? 3 : 3); } else { expected_reads += (read_from_cache ? 4 : 4); } ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); keys.resize(5); statuses.resize(5); std::vector key_tr{1, 2, 15, 16, 55}; for (size_t i = 0; i < key_tr.size(); ++i) { key_data[i] = "a" + Key(key_tr[i]); keys[i] = Slice(key_data[i]); statuses[i] = Status::OK(); values[i].Reset(); } dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); for (size_t i = 0; i < key_tr.size(); ++i) { ASSERT_OK(statuses[i]); ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString())); } if (compression_enabled() && !has_compressed_cache()) { expected_reads += (read_from_cache ? 0 : 2); ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); } else { if (has_uncompressed_cache()) { expected_reads += (read_from_cache ? 0 : 3); ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads); } else { // A rare case, even we enable the block compression but some of data // blocks are not compressed due to content. If user only enable the // compressed cache, the uncompressed blocks will not tbe cached, and // block reads will be triggered. The number of reads is related to // the compression algorithm. ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads); } } } TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) { std::vector key_data(10); std::vector keys; // We cannot resize a PinnableSlice vector, so just set initial size to // largest we think we will need std::vector values(10); std::vector statuses; int read_count = 0; ReadOptions ro; ro.fill_cache = fill_cache(); SyncPoint::GetInstance()->SetCallBack( "RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) { Status* s = static_cast(status); read_count++; if (read_count == 2) { *s = Status::Corruption(); } }); SyncPoint::GetInstance()->EnableProcessing(); // Warm up the cache first key_data.emplace_back(Key(0)); keys.emplace_back(Slice(key_data.back())); key_data.emplace_back(Key(50)); keys.emplace_back(Slice(key_data.back())); statuses.resize(keys.size()); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); ASSERT_TRUE(CheckValue(0, values[0].ToString())); //ASSERT_TRUE(CheckValue(50, values[1].ToString())); ASSERT_EQ(statuses[0], Status::OK()); ASSERT_EQ(statuses[1], Status::Corruption()); SyncPoint::GetInstance()->DisableProcessing(); } TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) { std::vector key_data(10); std::vector keys; // We cannot resize a PinnableSlice vector, so just set initial size to // largest we think we will need std::vector values(10); std::vector statuses; ReadOptions ro; ro.fill_cache = fill_cache(); SyncPoint::GetInstance()->SetCallBack( "TableCache::MultiGet:FindTable", [&](void *status) { Status* s = static_cast(status); *s = Status::IOError(); }); // DB open will create table readers unless we reduce the table cache // capacity. // SanitizeOptions will set max_open_files to minimum of 20. Table cache // is allocated with max_open_files - 10 as capacity. So override // max_open_files to 11 so table cache capacity will become 1. This will // prevent file open during DB open and force the file to be opened // during MultiGet SyncPoint::GetInstance()->SetCallBack( "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) { int* max_open_files = (int*)arg; *max_open_files = 11; }); SyncPoint::GetInstance()->EnableProcessing(); Reopen(CurrentOptions()); // Warm up the cache first key_data.emplace_back(Key(0)); keys.emplace_back(Slice(key_data.back())); key_data.emplace_back(Key(50)); keys.emplace_back(Slice(key_data.back())); statuses.resize(keys.size()); dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data(), true); ASSERT_EQ(statuses[0], Status::IOError()); ASSERT_EQ(statuses[1], Status::IOError()); SyncPoint::GetInstance()->DisableProcessing(); } INSTANTIATE_TEST_CASE_P( ParallelIO, DBBasicTestWithParallelIO, // Params are as follows - // Param 0 - Compressed cache enabled // Param 1 - Uncompressed cache enabled // Param 2 - Data compression enabled // Param 3 - ReadOptions::fill_cache ::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Bool(), ::testing::Bool())); class DBBasicTestWithTimestampBase : public DBTestBase { public: explicit DBBasicTestWithTimestampBase(const std::string& dbname) : DBTestBase(dbname) {} protected: static std::string Key1(uint64_t k) { uint32_t x = 1; const bool is_little_endian = (*reinterpret_cast(&x) != 0); std::string ret; if (is_little_endian) { ret.assign(reinterpret_cast(&k), sizeof(k)); } else { ret.resize(sizeof(k)); ret[0] = k & 0xff; ret[1] = (k >> 8) & 0xff; ret[2] = (k >> 16) & 0xff; ret[3] = (k >> 24) & 0xff; ret[4] = (k >> 32) & 0xff; ret[5] = (k >> 40) & 0xff; ret[6] = (k >> 48) & 0xff; ret[7] = (k >> 56) & 0xff; } std::reverse(ret.begin(), ret.end()); return ret; } class TestComparator : public Comparator { private: const Comparator* cmp_without_ts_; public: explicit TestComparator(size_t ts_sz) : Comparator(ts_sz), cmp_without_ts_(nullptr) { cmp_without_ts_ = BytewiseComparator(); } const char* Name() const override { return "TestComparator"; } void FindShortSuccessor(std::string*) const override {} void FindShortestSeparator(std::string*, const Slice&) const override {} int Compare(const Slice& a, const Slice& b) const override { int r = CompareWithoutTimestamp(a, b); if (r != 0 || 0 == timestamp_size()) { return r; } return -CompareTimestamp( Slice(a.data() + a.size() - timestamp_size(), timestamp_size()), Slice(b.data() + b.size() - timestamp_size(), timestamp_size())); } using Comparator::CompareWithoutTimestamp; int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b, bool b_has_ts) const override { if (a_has_ts) { assert(a.size() >= timestamp_size()); } if (b_has_ts) { assert(b.size() >= timestamp_size()); } Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a; Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b; return cmp_without_ts_->Compare(lhs, rhs); } int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override { if (!ts1.data() && !ts2.data()) { return 0; } else if (ts1.data() && !ts2.data()) { return 1; } else if (!ts1.data() && ts2.data()) { return -1; } assert(ts1.size() == ts2.size()); uint64_t low1 = 0; uint64_t low2 = 0; uint64_t high1 = 0; uint64_t high2 = 0; const size_t kSize = ts1.size(); std::unique_ptr ts1_buf(new char[kSize]); memcpy(ts1_buf.get(), ts1.data(), ts1.size()); std::unique_ptr ts2_buf(new char[kSize]); memcpy(ts2_buf.get(), ts2.data(), ts2.size()); Slice ts1_copy = Slice(ts1_buf.get(), kSize); Slice ts2_copy = Slice(ts2_buf.get(), kSize); auto* ptr1 = const_cast(&ts1_copy); auto* ptr2 = const_cast(&ts2_copy); if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) || !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) { assert(false); } if (high1 < high2) { return -1; } else if (high1 > high2) { return 1; } if (low1 < low2) { return -1; } else if (low1 > low2) { return 1; } return 0; } }; std::string Timestamp(uint64_t low, uint64_t high) { std::string ts; PutFixed64(&ts, low); PutFixed64(&ts, high); return ts; } void CheckIterUserEntry(const Iterator* it, const Slice& expected_key, const Slice& expected_value, const Slice& expected_ts) const { ASSERT_TRUE(it->Valid()); ASSERT_OK(it->status()); ASSERT_EQ(expected_key, it->key()); ASSERT_EQ(expected_value, it->value()); ASSERT_EQ(expected_ts, it->timestamp()); } void CheckIterEntry(const Iterator* it, const Slice& expected_ukey, SequenceNumber expected_seq, ValueType expected_val_type, const Slice& expected_value, const Slice& expected_ts) { ASSERT_TRUE(it->Valid()); ASSERT_OK(it->status()); std::string ukey_and_ts; ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size()); ukey_and_ts.append(expected_ts.data(), expected_ts.size()); ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type); std::string ikey; AppendInternalKey(&ikey, parsed_ikey); ASSERT_EQ(Slice(ikey), it->key()); if (expected_val_type == kTypeValue) { ASSERT_EQ(expected_value, it->value()); } ASSERT_EQ(expected_ts, it->timestamp()); } }; class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase { public: DBBasicTestWithTimestamp() : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {} }; TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { const int kNumKeysPerFile = 2048; const uint64_t kMaxKey = 16384; Options options = CurrentOptions(); options.env = env_; // TODO(yanqin) re-enable auto compaction options.disable_auto_compactions = true; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); DestroyAndReopen(options); const std::vector start_keys = {1, 0}; const std::vector write_timestamps = {Timestamp(1, 0), Timestamp(3, 0)}; const std::vector read_timestamps = {Timestamp(2, 0), Timestamp(4, 0)}; for (size_t i = 0; i < write_timestamps.size(); ++i) { WriteOptions write_opts; Slice write_ts = write_timestamps[i]; write_opts.timestamp = &write_ts; for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) { Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); ASSERT_OK(s); } } for (size_t i = 0; i < read_timestamps.size(); ++i) { ReadOptions read_opts; Slice read_ts = read_timestamps[i]; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); int count = 0; uint64_t key = 0; for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid(); it->Next(), ++count, ++key) { CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i), write_timestamps[i]); } size_t expected_count = kMaxKey - start_keys[i] + 1; ASSERT_EQ(expected_count, count); // SeekToFirst() with lower bound. // Then iter with lower and upper bounds. uint64_t l = 0; uint64_t r = kMaxKey + 1; while (l < r) { std::string lb_str = Key1(l); Slice lb = lb_str; std::string ub_str = Key1(r); Slice ub = ub_str; read_opts.iterate_lower_bound = &lb; read_opts.iterate_upper_bound = &ub; it.reset(db_->NewIterator(read_opts)); for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0; it->Valid(); it->Next(), ++key, ++count) { CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i), write_timestamps[i]); } ASSERT_EQ(r - std::max(l, start_keys[i]), count); l += (kMaxKey / 100); r -= (kMaxKey / 100); } } Close(); } TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { const int kNumKeysPerFile = 2048; const uint64_t kMaxKey = 0xffffffffffffffff; const uint64_t kMinKey = kMaxKey - 16383; Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; // TODO(yanqin) re-enable auto compaction options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); DestroyAndReopen(options); std::vector start_seqs; const int kNumTimestamps = 4; std::vector write_ts_list; for (int t = 0; t != kNumTimestamps; ++t) { write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17)); } WriteOptions write_opts; for (size_t i = 0; i != write_ts_list.size(); ++i) { Slice write_ts = write_ts_list[i]; write_opts.timestamp = &write_ts; uint64_t k = kMinKey; do { Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i)); ASSERT_OK(s); if (k == kMaxKey) { break; } ++k; } while (k != 0); start_seqs.push_back(db_->GetLatestSequenceNumber()); } std::vector read_ts_list; for (int t = 0; t != kNumTimestamps - 1; ++t) { read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17)); } ReadOptions read_opts; for (size_t i = 0; i != read_ts_list.size(); ++i) { Slice read_ts = read_ts_list[i]; read_opts.timestamp = &read_ts; read_opts.iter_start_seqnum = start_seqs[i]; std::unique_ptr iter(db_->NewIterator(read_opts)); SequenceNumber expected_seq = start_seqs[i] + 1; uint64_t key = kMinKey; for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) { CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue, "value" + std::to_string(i + 1), write_ts_list[i + 1]); ++key; ++expected_seq; } } Close(); } TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); // TODO(yanqin) re-enable auto compaction options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Insert kNumKeys WriteOptions write_opts; Status s; for (size_t i = 0; i != kNumKeys; ++i) { std::string ts_str = Timestamp(static_cast(i + 1), 0); Slice ts = ts_str; write_opts.timestamp = &ts; s = db_->Put(write_opts, "foo", "value" + std::to_string(i)); ASSERT_OK(s); } { ReadOptions read_opts; std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->SeekToFirst(); CheckIterUserEntry(iter.get(), "foo", "value0", ts_str); ASSERT_EQ( 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); } Close(); } TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; constexpr size_t kNumKeys = 16; options.max_sequential_skip_in_iterations = kNumKeys / 2; options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); // TODO(yanqin) re-enable auto compaction options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); // Write kNumKeys + 1 keys WriteOptions write_opts; Status s; for (size_t i = 0; i != kNumKeys; ++i) { std::string ts_str = Timestamp(static_cast(i + 1), 0); Slice ts = ts_str; write_opts.timestamp = &ts; s = db_->Put(write_opts, "a", "value" + std::to_string(i)); ASSERT_OK(s); } { std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); WriteBatch batch(0, 0, kTimestampSize); batch.Put("a", "new_value"); batch.Put("b", "new_value"); s = batch.AssignTimestamp(ts_str); ASSERT_OK(s); s = db_->Write(write_opts, &batch); ASSERT_OK(s); } { ReadOptions read_opts; std::string ts_str = Timestamp(static_cast(kNumKeys + 1), 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->Seek("a"); iter->Next(); CheckIterUserEntry(iter.get(), "b", "new_value", ts_str); ASSERT_EQ( 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION)); } Close(); } TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) { Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; DestroyAndReopen(options); constexpr size_t max_skippable_internal_keys = 2; const size_t kNumKeys = max_skippable_internal_keys + 2; WriteOptions write_opts; Status s; { std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; write_opts.timestamp = &ts; ASSERT_OK(db_->Put(write_opts, "a", "value")); } for (size_t i = 0; i < kNumKeys; ++i) { std::string ts_str = Timestamp(static_cast(i + 1), 0); Slice ts = ts_str; write_opts.timestamp = &ts; s = db_->Put(write_opts, "b", "value" + std::to_string(i)); ASSERT_OK(s); } { ReadOptions read_opts; read_opts.max_skippable_internal_keys = max_skippable_internal_keys; std::string ts_str = Timestamp(1, 0); Slice ts = ts_str; read_opts.timestamp = &ts; std::unique_ptr iter(db_->NewIterator(read_opts)); iter->SeekToFirst(); iter->Next(); ASSERT_TRUE(iter->status().IsIncomplete()); } Close(); } class DBBasicTestWithTimestampCompressionSettings : public DBBasicTestWithTimestampBase, public testing::WithParamInterface, CompressionType, uint32_t>> { public: DBBasicTestWithTimestampCompressionSettings() : DBBasicTestWithTimestampBase( "db_basic_test_with_timestamp_compression") {} }; TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) { const int kNumKeysPerFile = 8192; const size_t kNumTimestamps = 6; Options options = CurrentOptions(); options.create_if_missing = true; options.env = env_; options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); size_t ts_sz = Timestamp(0, 0).size(); TestComparator test_cmp(ts_sz); options.comparator = &test_cmp; BlockBasedTableOptions bbto; bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const CompressionType comp_type = std::get<1>(GetParam()); #if LZ4_VERSION_NUMBER < 10400 // r124+ if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { return; } #endif // LZ4_VERSION_NUMBER >= 10400 if (!ZSTD_Supported() && comp_type == kZSTD) { return; } if (!Zlib_Supported() && comp_type == kZlibCompression) { return; } options.compression = comp_type; options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } options.target_file_size_base = 1 << 26; // 64MB DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); size_t num_cfs = handles_.size(); ASSERT_EQ(2, num_cfs); std::vector write_ts_list; std::vector read_ts_list; for (size_t i = 0; i != kNumTimestamps; ++i) { write_ts_list.push_back(Timestamp(i * 2, 0)); read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice write_ts = write_ts_list.back(); WriteOptions wopts; wopts.timestamp = &write_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { ASSERT_OK(Put(cf, Key1(j), "value_" + std::to_string(j) + "_" + std::to_string(i), wopts)); } } } const auto& verify_db_func = [&]() { for (size_t i = 0; i != kNumTimestamps; ++i) { ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) { std::string value; ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value)); ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value); } } } }; verify_db_func(); Close(); } #ifndef ROCKSDB_LITE // A class which remembers the name of each flushed file. class FlushedFileCollector : public EventListener { public: FlushedFileCollector() {} ~FlushedFileCollector() override {} void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { InstrumentedMutexLock lock(&mutex_); flushed_files_.push_back(info.file_path); } std::vector GetFlushedFiles() { std::vector result; { InstrumentedMutexLock lock(&mutex_); result = flushed_files_; } return result; } void ClearFlushedFiles() { InstrumentedMutexLock lock(&mutex_); flushed_files_.clear(); } private: std::vector flushed_files_; InstrumentedMutex mutex_; }; TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { const int kNumKeysPerFile = 8192; const size_t kNumTimestamps = 2; const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps; const size_t kSplitPosBase = kNumKeysPerTimestamp / 2; Options options = CurrentOptions(); options.create_if_missing = true; options.env = env_; options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); FlushedFileCollector* collector = new FlushedFileCollector(); options.listeners.emplace_back(collector); size_t ts_sz = Timestamp(0, 0).size(); TestComparator test_cmp(ts_sz); options.comparator = &test_cmp; BlockBasedTableOptions bbto; bbto.filter_policy = std::get<0>(GetParam()); bbto.whole_key_filtering = true; options.table_factory.reset(NewBlockBasedTableFactory(bbto)); const CompressionType comp_type = std::get<1>(GetParam()); #if LZ4_VERSION_NUMBER < 10400 // r124+ if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) { return; } #endif // LZ4_VERSION_NUMBER >= 10400 if (!ZSTD_Supported() && comp_type == kZSTD) { return; } if (!Zlib_Supported() && comp_type == kZlibCompression) { return; } options.compression = comp_type; options.compression_opts.max_dict_bytes = std::get<2>(GetParam()); if (comp_type == kZSTD) { options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam()); } DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); size_t num_cfs = handles_.size(); ASSERT_EQ(2, num_cfs); std::vector write_ts_list; std::vector read_ts_list; const auto& verify_record_func = [&](size_t i, size_t k, ColumnFamilyHandle* cfh) { std::string value; std::string timestamp; ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; std::string expected_timestamp = std::string(write_ts_list[i].data(), write_ts_list[i].size()); ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, ×tamp)); ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value); ASSERT_EQ(expected_timestamp, timestamp); }; for (size_t i = 0; i != kNumTimestamps; ++i) { write_ts_list.push_back(Timestamp(i * 2, 0)); read_ts_list.push_back(Timestamp(1 + i * 2, 0)); const Slice write_ts = write_ts_list.back(); WriteOptions wopts; wopts.timestamp = &write_ts; for (int cf = 0; cf != static_cast(num_cfs); ++cf) { size_t memtable_get_start = 0; for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { ASSERT_OK(Put(cf, Key1(j), "value_" + std::to_string(j) + "_" + std::to_string(i), wopts)); if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) { for (size_t k = memtable_get_start; k <= j; ++k) { verify_record_func(i, k, handles_[cf]); } memtable_get_start = j + 1; // flush all keys with the same timestamp to two sst files, split at // incremental positions such that lowerlevel[1].smallest.userkey == // higherlevel[0].largest.userkey ASSERT_OK(Flush(cf)); // compact files (2 at each level) to a lower level such that all keys // with the same timestamp is at one level, with newer versions at // higher levels. CompactionOptions compact_opt; compact_opt.compression = kNoCompression; db_->CompactFiles(compact_opt, handles_[cf], collector->GetFlushedFiles(), static_cast(kNumTimestamps - i)); collector->ClearFlushedFiles(); } } } } const auto& verify_db_func = [&]() { for (size_t i = 0; i != kNumTimestamps; ++i) { ReadOptions ropts; const Slice read_ts = read_ts_list[i]; ropts.timestamp = &read_ts; std::string expected_timestamp(write_ts_list[i].data(), write_ts_list[i].size()); for (int cf = 0; cf != static_cast(num_cfs); ++cf) { ColumnFamilyHandle* cfh = handles_[cf]; for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) { verify_record_func(i, j, cfh); } } } }; verify_db_func(); Close(); } #endif // !ROCKSDB_LITE INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTimestampCompressionSettings, ::testing::Combine( ::testing::Values(std::shared_ptr(nullptr), std::shared_ptr( NewBloomFilterPolicy(10, false))), ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression, kLZ4HCCompression, kZSTD), ::testing::Values(0, 1 << 14))); class DBBasicTestWithTimestampPrefixSeek : public DBBasicTestWithTimestampBase, public testing::WithParamInterface< std::tuple, std::shared_ptr, bool>> { public: DBBasicTestWithTimestampPrefixSeek() : DBBasicTestWithTimestampBase( "/db_basic_test_with_timestamp_prefix_seek") {} }; TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) { const size_t kNumKeysPerFile = 4096; Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; // TODO(yanqin): re-enable auto compactions options.disable_auto_compactions = true; const size_t kTimestampSize = Timestamp(0, 0).size(); TestComparator test_cmp(kTimestampSize); options.comparator = &test_cmp; options.prefix_extractor = std::get<0>(GetParam()); options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); BlockBasedTableOptions bbto; bbto.filter_policy = std::get<1>(GetParam()); options.table_factory.reset(NewBlockBasedTableFactory(bbto)); DestroyAndReopen(options); const uint64_t kMaxKey = 0xffffffffffffffff; const uint64_t kMinKey = 0xffffffffffff8000; const std::vector write_ts_list = {Timestamp(3, 0xffffffff), Timestamp(6, 0xffffffff)}; WriteOptions write_opts; { for (size_t i = 0; i != write_ts_list.size(); ++i) { Slice write_ts = write_ts_list[i]; write_opts.timestamp = &write_ts; uint64_t key = kMinKey; do { Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); ASSERT_OK(s); if (key == kMaxKey) { break; } ++key; } while (true); } } const std::vector read_ts_list = {Timestamp(5, 0xffffffff), Timestamp(9, 0xffffffff)}; { ReadOptions read_opts; read_opts.total_order_seek = false; read_opts.prefix_same_as_start = std::get<2>(GetParam()); fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(), bbto.filter_policy ? bbto.filter_policy->Name() : "null", static_cast(read_opts.prefix_same_as_start)); for (size_t i = 0; i != read_ts_list.size(); ++i) { Slice read_ts = read_ts_list[i]; read_opts.timestamp = &read_ts; std::unique_ptr iter(db_->NewIterator(read_opts)); // Seek to kMaxKey iter->Seek(Key1(kMaxKey)); CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i), write_ts_list[i]); iter->Next(); ASSERT_FALSE(iter->Valid()); } const std::vector targets = {kMinKey, kMinKey + 0x10, kMinKey + 0x100, kMaxKey}; const SliceTransform* const pe = options.prefix_extractor.get(); ASSERT_NE(nullptr, pe); const size_t kPrefixShift = 8 * (Key1(0).size() - pe->Transform(Key1(0)).size()); const uint64_t kPrefixMask = ~((static_cast(1) << kPrefixShift) - 1); const uint64_t kNumKeysWithinPrefix = (static_cast(1) << kPrefixShift); for (size_t i = 0; i != read_ts_list.size(); ++i) { Slice read_ts = read_ts_list[i]; read_opts.timestamp = &read_ts; std::unique_ptr it(db_->NewIterator(read_opts)); for (size_t j = 0; j != targets.size(); ++j) { std::string start_key = Key1(targets[j]); uint64_t expected_ub = (targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix; uint64_t expected_key = targets[j]; size_t count = 0; it->Seek(Key1(targets[j])); while (it->Valid()) { std::string saved_prev_key; saved_prev_key.assign(it->key().data(), it->key().size()); // Out of prefix if (!read_opts.prefix_same_as_start && pe->Transform(saved_prev_key) != pe->Transform(start_key)) { break; } CheckIterUserEntry(it.get(), Key1(expected_key), "value" + std::to_string(i), write_ts_list[i]); ++count; ++expected_key; it->Next(); } ASSERT_EQ(expected_ub - targets[j] + 1, count); } } } Close(); } // TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g. // NoopTransform. INSTANTIATE_TEST_CASE_P( Timestamp, DBBasicTestWithTimestampPrefixSeek, ::testing::Combine( ::testing::Values( std::shared_ptr(NewFixedPrefixTransform(4)), std::shared_ptr(NewFixedPrefixTransform(7)), std::shared_ptr(NewFixedPrefixTransform(8))), ::testing::Values(std::shared_ptr(nullptr), std::shared_ptr( NewBloomFilterPolicy(10 /*bits_per_key*/, false)), std::shared_ptr( NewBloomFilterPolicy(20 /*bits_per_key*/, false))), ::testing::Bool())); } // namespace ROCKSDB_NAMESPACE #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS extern "C" { void RegisterCustomObjects(int argc, char** argv); } #else void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {} #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); RegisterCustomObjects(argc, argv); return RUN_ALL_TESTS(); }