From 9c8ad62691103e2bcb38b0e828d57bb2e1c326d1 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 6 Mar 2014 11:36:39 -0800 Subject: [PATCH 1/7] DB Sanity Test Summary: @kailiu mentioned on meeting yesterday that we sometimes have trouble opening DB created by old version with the new version. This will be very important to test for column families, since I'm changing disk format for the MANIFEST. I added a tool that can help us test that. Usage: ./db_sanity_test create will create a bunch of DBs under ./db_sanity_test verify will verify consistency of DBs created under Test Plan: ran the db_sanity_test Reviewers: kailiu, dhruba, haobo Reviewed By: kailiu CC: leveldb, kailiu, xjin Differential Revision: https://reviews.facebook.net/D16605 --- Makefile | 3 + tools/db_sanity_test.cc | 201 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 tools/db_sanity_test.cc diff --git a/Makefile b/Makefile index 7a8e16777..a7fcbf0cd 100644 --- a/Makefile +++ b/Makefile @@ -234,6 +234,9 @@ block_hash_index_test: table/block_hash_index_test.o $(LIBOBJECTS) $(TESTHARNESS db_stress: tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL) $(CXX) tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +db_sanity_test: tools/db_sanity_test.o $(LIBOBJECTS) $(TESTUTIL) + $(CXX) tools/db_sanity_test.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + db_repl_stress: tools/db_repl_stress.o $(LIBOBJECTS) $(TESTUTIL) $(CXX) tools/db_repl_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/tools/db_sanity_test.cc b/tools/db_sanity_test.cc new file mode 100644 index 000000000..f05d6f7ce --- /dev/null +++ b/tools/db_sanity_test.cc @@ -0,0 +1,201 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include + +#include "include/rocksdb/db.h" +#include "include/rocksdb/options.h" +#include "include/rocksdb/env.h" +#include "include/rocksdb/slice.h" +#include "include/rocksdb/status.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/table.h" +#include "include/rocksdb/slice_transform.h" + +namespace rocksdb { + +class SanityTest { + public: + explicit SanityTest(const std::string& path) + : env_(Env::Default()), path_(path) { + env_->CreateDirIfMissing(path); + } + virtual ~SanityTest() {} + + virtual std::string Name() const = 0; + virtual Options GetOptions() const = 0; + + Status Create() { + Options options = GetOptions(); + options.create_if_missing = true; + std::string dbname = path_ + Name(); + DestroyDB(dbname, options); + DB* db; + Status s = DB::Open(options, dbname, &db); + std::unique_ptr db_guard(db); + if (!s.ok()) { + return s; + } + for (int i = 0; i < 1000000; ++i) { + std::string k = "key" + std::to_string(i); + std::string v = "value" + std::to_string(i); + s = db->Put(WriteOptions(), Slice(k), Slice(v)); + if (!s.ok()) { + return s; + } + } + return Status::OK(); + } + Status Verify() { + DB* db; + std::string dbname = path_ + Name(); + Status s = DB::Open(GetOptions(), dbname, &db); + std::unique_ptr db_guard(db); + if (!s.ok()) { + return s; + } + for (int i = 0; i < 1000000; ++i) { + std::string k = "key" + std::to_string(i); + std::string v = "value" + std::to_string(i); + std::string result; + s = db->Get(ReadOptions(), Slice(k), &result); + if (!s.ok()) { + return s; + } + if (result != v) { + return Status::Corruption("Unexpected value for key " + k); + } + } + return Status::OK(); + } + + private: + Env* env_; + std::string const path_; +}; + +class SanityTestBasic : public SanityTest { + public: + explicit SanityTestBasic(const std::string& path) : SanityTest(path) {} + virtual Options GetOptions() const { + Options options; + options.create_if_missing = true; + return options; + } + virtual std::string Name() const { return "Basic"; } +}; + +class SanityTestSpecialComparator : public SanityTest { + public: + explicit SanityTestSpecialComparator(const std::string& path) + : SanityTest(path) { + options_.comparator = new NewComparator(); + } + ~SanityTestSpecialComparator() { delete options_.comparator; } + virtual Options GetOptions() const { return options_; } + virtual std::string Name() const { return "SpecialComparator"; } + + private: + class NewComparator : public Comparator { + public: + virtual const char* Name() const { return "rocksdb.NewComparator"; } + virtual int Compare(const Slice& a, const Slice& b) const { + return BytewiseComparator()->Compare(a, b); + } + virtual void FindShortestSeparator(std::string* s, const Slice& l) const { + BytewiseComparator()->FindShortestSeparator(s, l); + } + virtual void FindShortSuccessor(std::string* key) const { + BytewiseComparator()->FindShortSuccessor(key); + } + }; + Options options_; +}; + +class SanityTestZlibCompression : public SanityTest { + public: + explicit SanityTestZlibCompression(const std::string& path) + : SanityTest(path) { + options_.compression = kZlibCompression; + } + virtual Options GetOptions() const { return options_; } + virtual std::string Name() const { return "ZlibCompression"; } + + private: + Options options_; +}; + +class SanityTestPlainTableFactory : public SanityTest { + public: + explicit SanityTestPlainTableFactory(const std::string& path) + : SanityTest(path) { + options_.table_factory.reset(NewPlainTableFactory()); + options_.prefix_extractor = NewFixedPrefixTransform(2); + options_.allow_mmap_reads = true; + } + ~SanityTestPlainTableFactory() { delete options_.prefix_extractor; } + virtual Options GetOptions() const { return options_; } + virtual std::string Name() const { return "PlainTable"; } + + private: + Options options_; +}; + +bool RunSanityTests(const std::string& command, const std::string& path) { + std::vector sanity_tests = { + new SanityTestBasic(path), + new SanityTestSpecialComparator(path), + new SanityTestZlibCompression(path), + new SanityTestPlainTableFactory(path)}; + + if (command == "create") { + fprintf(stderr, "Creating...\n"); + } else { + fprintf(stderr, "Verifying...\n"); + } + for (auto sanity_test : sanity_tests) { + Status s; + fprintf(stderr, "%s -- ", sanity_test->Name().c_str()); + if (command == "create") { + s = sanity_test->Create(); + } else { + assert(command == "verify"); + s = sanity_test->Verify(); + } + fprintf(stderr, "%s\n", s.ToString().c_str()); + if (!s.ok()) { + fprintf(stderr, "FAIL\n"); + return false; + } + + delete sanity_test; + } + return true; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + std::string path, command; + bool ok = (argc == 3); + if (ok) { + path = std::string(argv[1]); + command = std::string(argv[2]); + ok = (command == "create" || command == "verify"); + } + if (!ok) { + fprintf(stderr, "Usage: %s [create|verify] \n", argv[0]); + exit(1); + } + if (path.back() != '/') { + path += "/"; + } + + bool sanity_ok = rocksdb::RunSanityTests(command, path); + + return sanity_ok ? 0 : 1; +} From 26ac5603f479d930b954fbe7083bdc5bfc2fe4dc Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 6 Mar 2014 15:59:27 -0800 Subject: [PATCH 2/7] Truncate unused space on PosixWritableFile::Close() Summary: Blocks allocated with fallocate will take extra space on disk even if they are unused and the file is close. Now we remove the extra blocks at the end of the file by calling `ftruncate`. Test Plan: added a test to env_test Reviewers: dhruba Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D16647 --- util/env_posix.cc | 9 +++++++++ util/env_test.cc | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/util/env_posix.cc b/util/env_posix.cc index fcfea28ab..ef7655e60 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -678,10 +678,19 @@ class PosixWritableFile : public WritableFile { Status s; s = Flush(); // flush cache to OS if (!s.ok()) { + return s; } TEST_KILL_RANDOM(rocksdb_kill_odds); + size_t block_size; + size_t last_allocated_block; + GetPreallocationStatus(&block_size, &last_allocated_block); + if (last_allocated_block > 0) { + // trim the extra space preallocated at the end of the file + ftruncate(fd_, filesize_); // ignore errors + } + if (close(fd_) < 0) { if (s.ok()) { s = IOError(filename_, errno); diff --git a/util/env_test.cc b/util/env_test.cc index eb2829303..a442e3a5c 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -12,6 +12,11 @@ #include #include +#ifdef OS_LINUX +#include +#include +#endif + #include "rocksdb/env.h" #include "port/port.h" #include "util/coding.h" @@ -258,6 +263,41 @@ TEST(EnvPosixTest, RandomAccessUniqueID) { env_->DeleteFile(fname); } +// only works in linux platforms +#ifdef ROCKSDB_FALLOCATE_PRESENT +TEST(EnvPosixTest, AllocateTest) { + std::string fname = GetOnDiskTestDir() + "/preallocate_testfile"; + EnvOptions soptions; + soptions.use_mmap_writes = false; + unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + // allocate 100 MB + size_t kPreallocateSize = 100 * 1024 * 1024; + size_t kBlockSize = 512; + size_t kPageSize = 4096; + std::string data = "test"; + wfile->SetPreallocationBlockSize(kPreallocateSize); + ASSERT_OK(wfile->Append(Slice(data))); + ASSERT_OK(wfile->Flush()); + + struct stat f_stat; + stat(fname.c_str(), &f_stat); + ASSERT_EQ(data.size(), f_stat.st_size); + // verify that blocks are preallocated + ASSERT_EQ(kPreallocateSize / kBlockSize, f_stat.st_blocks); + + // close the file, should deallocate the blocks + wfile.reset(); + + stat(fname.c_str(), &f_stat); + ASSERT_EQ(data.size(), f_stat.st_size); + // verify that preallocated blocks were deallocated on file close + size_t data_blocks_pages = ((data.size() + kPageSize - 1) / kPageSize); + ASSERT_EQ(data_blocks_pages * kPageSize / kBlockSize, f_stat.st_blocks); +} +#endif + // Returns true if any of the strings in ss are the prefix of another string. bool HasPrefix(const std::unordered_set& ss) { for (const std::string& s: ss) { From e1f52b6a22537ff47e7f7f4d85a3dcc3232ea3e5 Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 6 Mar 2014 15:12:34 -0800 Subject: [PATCH 3/7] Fix Valgrind error introduced by D16515 Summary: valgrind reports issues. This patch seems to fix it. Test Plan: run the tests that fails in valgrind Reviewers: igor, haobo, kailiu Reviewed By: kailiu CC: dhruba, ljin, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D16653 --- include/rocksdb/env.h | 2 +- util/env.cc | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 68ab1bb6a..b5f78d1c3 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -584,7 +584,7 @@ class LogBuffer { ~LogBuffer(); // Add a log entry to the buffer. - void AddLogToBuffer(const char* format, ...); + void AddLogToBuffer(const char* format, va_list ap); // Flush all buffered log to the info log. void FlushBufferToLog() const; diff --git a/util/env.cc b/util/env.cc index 9624c04ab..36bc78d55 100644 --- a/util/env.cc +++ b/util/env.cc @@ -49,7 +49,7 @@ LogBuffer::LogBuffer(const InfoLogLevel log_level, LogBuffer::~LogBuffer() { delete rep_; } -void LogBuffer::AddLogToBuffer(const char* format, ...) { +void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { // Skip the level because of its level. return; @@ -69,10 +69,10 @@ void LogBuffer::AddLogToBuffer(const char* format, ...) { // Print the message if (p < limit) { - va_list ap; - va_start(ap, format); - p += vsnprintf(p, limit - p, format, ap); - va_end(ap); + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); } // Add '\0' to the end @@ -102,7 +102,7 @@ void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { if (log_buffer != nullptr) { va_list ap; va_start(ap, format); - log_buffer->AddLogToBuffer(format); + log_buffer->AddLogToBuffer(format, ap); va_end(ap); } } From 566f18e6ad0499310bca0c960e2c12e2bb9dd400 Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Thu, 6 Mar 2014 17:30:46 -0800 Subject: [PATCH 4/7] More precise calculation of sub_index_size Summary: Previous we did rough estimation of subindex size, which in worst case may result in array reallocation. This patch aims to get the exact size and avoid any reallocation. Test Plan: make all check Reviewers: sdong, dhruba, haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16125 --- table/plain_table_reader.cc | 67 +++++++++++++------------------------ table/plain_table_reader.h | 16 ++++----- 2 files changed, 32 insertions(+), 51 deletions(-) diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 8b675af11..46886291e 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -287,8 +287,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) { size_t PlainTableReader::BucketizeIndexesAndFillBloom( IndexRecordList* record_list, std::vector* hash_to_offsets, - std::vector* bucket_count) { - size_t sub_index_size_needed = 0; + std::vector* entries_per_bucket) { bool first = true; uint32_t prev_hash = 0; size_t num_records = record_list->GetNumRecords(); @@ -306,36 +305,31 @@ size_t PlainTableReader::BucketizeIndexesAndFillBloom( IndexRecord* prev_bucket_head = (*hash_to_offsets)[bucket]; index_record->next = prev_bucket_head; (*hash_to_offsets)[bucket] = index_record; - auto& item_count = (*bucket_count)[bucket]; - if (item_count > 0) { - if (item_count == 1) { - sub_index_size_needed += kOffsetLen + 1; - } - if (item_count == 127) { - // Need more than one byte for length - sub_index_size_needed++; - } - sub_index_size_needed += kOffsetLen; + (*entries_per_bucket)[bucket]++; + } + size_t sub_index_size = 0; + for (auto entry_count : *entries_per_bucket) { + if (entry_count <= 1) { + continue; } - item_count++; + // Only buckets with more than 1 entry will have subindex. + sub_index_size += VarintLength(entry_count); + // total bytes needed to store these entries' in-file offsets. + sub_index_size += entry_count * kOffsetLen; } - return sub_index_size_needed; + return sub_index_size; } void PlainTableReader::FillIndexes( - size_t sub_index_size_needed, + const size_t kSubIndexSize, const std::vector& hash_to_offsets, - const std::vector& bucket_count) { - Log(options_.info_log, "Reserving %zu bytes for sub index", - sub_index_size_needed); - // 8 bytes buffer for variable length size - size_t buffer_size = 8 * 8; - size_t buffer_used = 0; - sub_index_size_needed += buffer_size; - sub_index_.reset(new char[sub_index_size_needed]); + const std::vector& entries_per_bucket) { + Log(options_.info_log, "Reserving %zu bytes for plain table's sub_index", + kSubIndexSize); + sub_index_.reset(new char[kSubIndexSize]); size_t sub_index_offset = 0; for (int i = 0; i < index_size_; i++) { - uint32_t num_keys_for_bucket = bucket_count[i]; + uint32_t num_keys_for_bucket = entries_per_bucket[i]; switch (num_keys_for_bucket) { case 0: // No key for bucket @@ -351,21 +345,6 @@ void PlainTableReader::FillIndexes( char* prev_ptr = &sub_index_[sub_index_offset]; char* cur_ptr = EncodeVarint32(prev_ptr, num_keys_for_bucket); sub_index_offset += (cur_ptr - prev_ptr); - if (cur_ptr - prev_ptr > 2 - || (cur_ptr - prev_ptr == 2 && num_keys_for_bucket <= 127)) { - // Need to resize sub_index. Exponentially grow buffer. - buffer_used += cur_ptr - prev_ptr - 1; - if (buffer_used + 4 > buffer_size) { - Log(options_.info_log, "Recalculate suffix_map length to %zu", - sub_index_size_needed); - - sub_index_size_needed += buffer_size; - buffer_size *= 2; - char* new_sub_index = new char[sub_index_size_needed]; - memcpy(new_sub_index, sub_index_.get(), sub_index_offset); - sub_index_.reset(new_sub_index); - } - } char* sub_index_pos = &sub_index_[sub_index_offset]; IndexRecord* record = hash_to_offsets[i]; int j; @@ -375,12 +354,14 @@ void PlainTableReader::FillIndexes( } assert(j == -1 && record == nullptr); sub_index_offset += kOffsetLen * num_keys_for_bucket; + assert(sub_index_offset <= kSubIndexSize); break; } } + assert(sub_index_offset == kSubIndexSize); Log(options_.info_log, "hash table size: %d, suffix_map length %zu", - index_size_, sub_index_size_needed); + index_size_, kSubIndexSize); } Status PlainTableReader::PopulateIndex() { @@ -422,11 +403,11 @@ Status PlainTableReader::PopulateIndex() { // Bucketize all the index records to a temp data structure, in which for // each bucket, we generate a linked list of IndexRecord, in reversed order. std::vector hash_to_offsets(index_size_, nullptr); - std::vector bucket_count(index_size_, 0); + std::vector entries_per_bucket(index_size_, 0); size_t sub_index_size_needed = BucketizeIndexesAndFillBloom( - &record_list, &hash_to_offsets, &bucket_count); + &record_list, &hash_to_offsets, &entries_per_bucket); // From the temp data structure, populate indexes. - FillIndexes(sub_index_size_needed, hash_to_offsets, bucket_count); + FillIndexes(sub_index_size_needed, hash_to_offsets, entries_per_bucket); return Status::OK(); } diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index 16bbc8ba5..a93b3dd35 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -196,18 +196,18 @@ class PlainTableReader: public TableReader { // containing a linklist of IndexRecord hashed to the same bucket, in reverse // order. // of offsets for the hash, in reversed order. - // bucket_count is sized of index_size_. The value is how many index + // entries_per_bucket is sized of index_size_. The value is how many index // records are there in bucket_headers for the same bucket. - size_t BucketizeIndexesAndFillBloom(IndexRecordList* record_list, - std::vector* bucket_headers, - std::vector* bucket_count); + size_t BucketizeIndexesAndFillBloom( + IndexRecordList* record_list, std::vector* bucket_headers, + std::vector* entries_per_bucket); // Internal helper class to fill the indexes and bloom filters to internal - // data structures. bucket_headers and bucket_count are bucketized indexes - // and counts generated by BucketizeIndexesAndFillBloom(). - void FillIndexes(size_t sub_index_size_needed, + // data structures. bucket_headers and entries_per_bucket are bucketized + // indexes and counts generated by BucketizeIndexesAndFillBloom(). + void FillIndexes(const size_t kSubIndexSize, const std::vector& bucket_headers, - const std::vector& bucket_count); + const std::vector& entries_per_bucket); // Read a plain table key from the position `start`. The read content // will be written to `key` and the size of read bytes will be populated From eec86952062cf1e298647823707111a23e9ca4b5 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 6 Mar 2014 18:15:26 -0800 Subject: [PATCH 5/7] Delete local sv when destroying DB from stress test Summary: Not deleting local SV caused some an crash test issue: http://ci-builds.fb.com/job/rocksdb_asan_crash_test/83/console Test Plan: ran unit tests Reviewers: ljin Reviewed By: ljin CC: leveldb Differential Revision: https://reviews.facebook.net/D16635 --- db/db_impl.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index d05424410..b8942c42a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -402,6 +402,15 @@ void DBImpl::TEST_Destroy_DBImpl() { bg_logstats_scheduled_) { bg_cv_.Wait(); } + mutex_.Unlock(); + + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + // It also needs to be done after FlushMemTable, which can trigger local_sv_ + // access. + delete local_sv_; + + mutex_.Lock(); if (super_version_ != nullptr) { bool is_last_reference __attribute__((unused)); is_last_reference = super_version_->Unref(); From 056a0286d279239c279d1e430e7a4b8619f4a337 Mon Sep 17 00:00:00 2001 From: Yumikiyo Osanai Date: Sat, 8 Mar 2014 02:14:34 +0900 Subject: [PATCH 6/7] Modify the compile error about ftruncate() Summary: Change to store the return value from ftruncate(). The reason is that ftruncate() has "warn_unused_result" attribute in some environment. Signed-off-by: Yumikiyo Osanai --- util/env_posix.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index ef7655e60..e019d6af0 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -688,7 +688,8 @@ class PosixWritableFile : public WritableFile { GetPreallocationStatus(&block_size, &last_allocated_block); if (last_allocated_block > 0) { // trim the extra space preallocated at the end of the file - ftruncate(fd_, filesize_); // ignore errors + int dummy __attribute__((unused)); + dummy = ftruncate(fd_, filesize_); // ignore errors } if (close(fd_) < 0) { From e5fa4944fcba1df6ac414858ca36d64cefa15d0a Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Fri, 7 Mar 2014 14:43:22 -0800 Subject: [PATCH 7/7] use CAS when returning SuperVersion to ThreadLocal Summary: Add a check at the end of GetImpl to release SuperVersion if it becomes obsolete. Also do Scrape() inside InstallSuperVersion so it happens more frequent. Test Plan: make all check running asan_check now Reviewers: igor, haobo, sdong, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16641 --- db/db_impl.cc | 53 ++++++++++++++++++++++++++++-------- db/db_impl.h | 13 +++++++++ include/rocksdb/statistics.h | 6 ++-- util/thread_local.cc | 25 ++++++++++++++--- util/thread_local.h | 20 ++++++++++---- util/thread_local_test.cc | 20 ++++++++++++-- 6 files changed, 112 insertions(+), 25 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index b8942c42a..99cfc6e6c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -63,6 +63,10 @@ namespace rocksdb { +int DBImpl::SuperVersion::dummy = 0; +void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy; +void* const DBImpl::SuperVersion::kSVObsolete = nullptr; + void DumpLeveldbBuildVersion(Logger * log); // Information kept for every waiting writer @@ -1327,10 +1331,6 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, if (s.ok()) { InstallSuperVersion(deletion_state); - // Reset SuperVersions cached in thread local storage - if (options_.allow_thread_local) { - ResetThreadLocalSuperVersions(&deletion_state); - } if (madeProgress) { *madeProgress = 1; } @@ -2874,6 +2874,10 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { SuperVersion* old_superversion = InstallSuperVersion(new_superversion); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); + // Reset SuperVersions cached in thread local storage + if (options_.allow_thread_local) { + ResetThreadLocalSuperVersions(&deletion_state); + } } DBImpl::SuperVersion* DBImpl::InstallSuperVersion( @@ -2896,9 +2900,12 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion( void DBImpl::ResetThreadLocalSuperVersions(DeletionState* deletion_state) { mutex_.AssertHeld(); autovector sv_ptrs; - local_sv_->Scrape(&sv_ptrs); + local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete); for (auto ptr : sv_ptrs) { assert(ptr); + if (ptr == SuperVersion::kSVInUse) { + continue; + } auto sv = static_cast(ptr); if (static_cast(ptr)->Unref()) { sv->Cleanup(); @@ -2936,10 +2943,17 @@ Status DBImpl::GetImpl(const ReadOptions& options, // is being used while a new SuperVersion is installed, the cached // SuperVersion can become stale. It will eventually get refreshed either // on the next GetImpl() call or next SuperVersion installation. - sv = static_cast(local_sv_->Swap(nullptr)); - if (!sv || sv->version_number != - super_version_number_.load(std::memory_order_relaxed)) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES); + void* ptr = local_sv_->Swap(SuperVersion::kSVInUse); + // Invariant: + // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage + // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage + // should only keep kSVInUse during a GetImpl. + assert(ptr != SuperVersion::kSVInUse); + sv = static_cast(ptr); + if (sv == SuperVersion::kSVObsolete || + sv->version_number != super_version_number_.load( + std::memory_order_relaxed)) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); SuperVersion* sv_to_delete = nullptr; if (sv && sv->Unref()) { @@ -2999,11 +3013,25 @@ Status DBImpl::GetImpl(const ReadOptions& options, mutex_.Unlock(); } - // Release SuperVersion + bool unref_sv = true; if (LIKELY(options_.allow_thread_local)) { // Put the SuperVersion back - local_sv_->Reset(static_cast(sv)); - } else { + void* expected = SuperVersion::kSVInUse; + if (local_sv_->CompareAndSwap(static_cast(sv), expected)) { + // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal + // storage has not been altered and no Scrape has happend. The + // SuperVersion is still current. + unref_sv = false; + } else { + // ThreadLocal scrape happened in the process of this GetImpl call (after + // thread local Swap() at the beginning and before CompareAndSwap()). + // This means the SuperVersion it holds is obsolete. + assert(expected == SuperVersion::kSVObsolete); + } + } + + if (unref_sv) { + // Release SuperVersion bool delete_sv = false; if (sv->Unref()) { mutex_.Lock(); @@ -3014,6 +3042,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (delete_sv) { delete sv; } + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES); } // Note, tickers are atomic now - no lock protection needed any more. diff --git a/db/db_impl.h b/db/db_impl.h index c27dac849..2877317cb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -174,9 +174,22 @@ class DBImpl : public DB { void Cleanup(); void Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); + + // The value of dummy is not actually used. kSVInUse takes its address as a + // mark in the thread local storage to indicate the SuperVersion is in use + // by thread. This way, the value of kSVInUse is guaranteed to have no + // conflict with SuperVersion object address and portable on different + // platform. + static int dummy; + static void* const kSVInUse; + static void* const kSVObsolete; }; static void SuperVersionUnrefHandle(void* ptr) { + // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets + // destroyed. When former happens, the thread shouldn't see kSVInUse. + // When latter happens, we are in ~DBImpl(), no get should happen as well. + assert(ptr != SuperVersion::kSVInUse); DBImpl::SuperVersion* sv = static_cast(ptr); if (sv->Unref()) { sv->db->mutex_.Lock(); diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 82cc7133f..d076f6f76 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -122,7 +122,8 @@ enum Tickers { // Number of table's properties loaded directly from file, without creating // table reader object. NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, - NUMBER_SUPERVERSION_UPDATES, + NUMBER_SUPERVERSION_ACQUIRES, + NUMBER_SUPERVERSION_RELEASES, TICKER_ENUM_MAX }; @@ -178,7 +179,8 @@ const std::vector> TickersNameMap = { {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, "rocksdb.number.direct.load.table.properties"}, - {NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"}, + {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, + {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, }; /** diff --git a/util/thread_local.cc b/util/thread_local.cc index 2e5d3618b..1b4220b8f 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -138,12 +138,25 @@ void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) { return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed); } -void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector* ptrs) { +bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr, + void*& expected) { + auto* tls = GetThreadLocal(); + if (UNLIKELY(id >= tls->entries.size())) { + // Need mutex to protect entries access within ReclaimId + MutexLock l(&mutex_); + tls->entries.resize(id + 1); + } + return tls->entries[id].ptr.compare_exchange_strong(expected, ptr, + std::memory_order_relaxed, std::memory_order_relaxed); +} + +void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector* ptrs, + void* const replacement) { MutexLock l(&mutex_); for (ThreadData* t = head_.next; t != &head_; t = t->next) { if (id < t->entries.size()) { void* ptr = - t->entries[id].ptr.exchange(nullptr, std::memory_order_relaxed); + t->entries[id].ptr.exchange(replacement, std::memory_order_relaxed); if (ptr != nullptr) { ptrs->push_back(ptr); } @@ -225,8 +238,12 @@ void* ThreadLocalPtr::Swap(void* ptr) { return StaticMeta::Instance()->Swap(id_, ptr); } -void ThreadLocalPtr::Scrape(autovector* ptrs) { - StaticMeta::Instance()->Scrape(id_, ptrs); +bool ThreadLocalPtr::CompareAndSwap(void* ptr, void*& expected) { + return StaticMeta::Instance()->CompareAndSwap(id_, ptr, expected); +} + +void ThreadLocalPtr::Scrape(autovector* ptrs, void* const replacement) { + StaticMeta::Instance()->Scrape(id_, ptrs, replacement); } } // namespace rocksdb diff --git a/util/thread_local.h b/util/thread_local.h index d6fc5f085..d1434e3e5 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -47,9 +47,15 @@ class ThreadLocalPtr { // Atomically swap the supplied ptr and return the previous value void* Swap(void* ptr); - // Return non-nullptr data for all existing threads and reset them - // to nullptr - void Scrape(autovector* ptrs); + // Atomically compare the stored value with expected. Set the new + // pointer value to thread local only if the comparision is true. + // Otherwise, expected returns the stored value. + // Return true on success, false on failure + bool CompareAndSwap(void* ptr, void*& expected); + + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(autovector* ptrs, void* const replacement); protected: struct Entry { @@ -99,8 +105,12 @@ class ThreadLocalPtr { void Reset(uint32_t id, void* ptr); // Atomically swap the supplied ptr and return the previous value void* Swap(uint32_t id, void* ptr); - // Return data for all existing threads and return them to nullptr - void Scrape(uint32_t id, autovector* ptrs); + // Atomically compare and swap the provided value only if it equals + // to expected value. + bool CompareAndSwap(uint32_t id, void* ptr, void*& expected); + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(uint32_t id, autovector* ptrs, void* const replacement); // Register the UnrefHandler for id void SetHandler(uint32_t id, UnrefHandler handler); diff --git a/util/thread_local_test.cc b/util/thread_local_test.cc index 96e35d959..b1a865d5e 100644 --- a/util/thread_local_test.cc +++ b/util/thread_local_test.cc @@ -435,8 +435,8 @@ TEST(ThreadLocalTest, Scrape) { // Scrape all thread local data. No unref at thread // exit or ThreadLocalPtr destruction autovector ptrs; - p.tls1.Scrape(&ptrs); - p.tls2->Scrape(&ptrs); + p.tls1.Scrape(&ptrs, nullptr); + p.tls2->Scrape(&ptrs, nullptr); delete p.tls2; // Signal to exit mu.Lock(); @@ -449,6 +449,22 @@ TEST(ThreadLocalTest, Scrape) { } } +TEST(ThreadLocalTest, CompareAndSwap) { + ThreadLocalPtr tls; + ASSERT_TRUE(tls.Swap(reinterpret_cast(1)) == nullptr); + void* expected = reinterpret_cast(1); + // Swap in 2 + ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast(2), expected)); + expected = reinterpret_cast(100); + // Fail Swap, still 2 + ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast(2), expected)); + ASSERT_EQ(expected, reinterpret_cast(2)); + // Swap in 3 + expected = reinterpret_cast(2); + ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast(3), expected)); + ASSERT_EQ(tls.Get(), reinterpret_cast(3)); +} + } // namespace rocksdb int main(int argc, char** argv) {