diff --git a/Makefile b/Makefile index 74583ca3b..85c9d521e 100644 --- a/Makefile +++ b/Makefile @@ -235,6 +235,9 @@ block_hash_index_test: table/block_hash_index_test.o $(LIBOBJECTS) $(TESTHARNESS db_stress: tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL) $(CXX) tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +db_sanity_test: tools/db_sanity_test.o $(LIBOBJECTS) $(TESTUTIL) + $(CXX) tools/db_sanity_test.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + db_repl_stress: tools/db_repl_stress.o $(LIBOBJECTS) $(TESTUTIL) $(CXX) tools/db_repl_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index 0aee751ca..2aeda11f3 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -111,6 +111,9 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, return result; } +int SuperVersion::dummy = 0; +void* const SuperVersion::kSVInUse = &SuperVersion::dummy; +void* const SuperVersion::kSVObsolete = nullptr; SuperVersion::~SuperVersion() { for (auto td : to_delete) { @@ -153,6 +156,10 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm, namespace { void SuperVersionUnrefHandle(void* ptr) { + // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets + // destroyed. When former happens, the thread shouldn't see kSVInUse. + // When latter happens, we are in ~ColumnFamilyData(), no get should happen as + // well. SuperVersion* sv = static_cast(ptr); if (sv->Unref()) { sv->db_mutex->Lock(); @@ -299,9 +306,12 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( void ColumnFamilyData::ResetThreadLocalSuperVersions() { autovector sv_ptrs; - local_sv_->Scrape(&sv_ptrs); + local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete); for (auto ptr : sv_ptrs) { assert(ptr); + if (ptr == SuperVersion::kSVInUse) { + continue; + } auto sv = static_cast(ptr); if (sv->Unref()) { sv->Cleanup(); diff --git a/db/column_family.h b/db/column_family.h index 560e3dabd..d09174521 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -92,6 +92,15 @@ struct SuperVersion { void Cleanup(); void Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); + + // The value of dummy is not actually used. kSVInUse takes its address as a + // mark in the thread local storage to indicate the SuperVersion is in use + // by thread. This way, the value of kSVInUse is guaranteed to have no + // conflict with SuperVersion object address and portable on different + // platform. + static int dummy; + static void* const kSVInUse; + static void* const kSVObsolete; }; extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, @@ -163,12 +172,7 @@ class ColumnFamilyData { } SuperVersion* GetSuperVersion() const { return super_version_; } - SuperVersion* GetAndResetThreadLocalSuperVersion() const { - return static_cast(local_sv_->Swap(nullptr)); - } - void SetThreadLocalSuperVersion(SuperVersion* super_version) { - local_sv_->Reset(static_cast(super_version)); - } + ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); } uint64_t GetSuperVersionNumber() const { return super_version_number_.load(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 57e54c821..9b946cb1c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1263,10 +1263,6 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, if (s.ok()) { InstallSuperVersion(cfd, deletion_state); - // Reset SuperVersions cached in thread local storage - if (options_.allow_thread_local) { - cfd->ResetThreadLocalSuperVersions(); - } if (madeProgress) { *madeProgress = 1; } @@ -2876,6 +2872,10 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, cfd->InstallSuperVersion(new_superversion, &mutex_); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); + // Reset SuperVersions cached in thread local storage + if (options_.allow_thread_local) { + cfd->ResetThreadLocalSuperVersions(); + } } Status DBImpl::GetImpl(const ReadOptions& options, @@ -2897,6 +2897,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // Acquire SuperVersion SuperVersion* sv = nullptr; + ThreadLocalPtr* thread_local_sv = nullptr; if (LIKELY(options_.allow_thread_local)) { // The SuperVersion is cached in thread local storage to avoid acquiring // mutex when SuperVersion does not change since the last use. When a new @@ -2907,9 +2908,17 @@ Status DBImpl::GetImpl(const ReadOptions& options, // is being used while a new SuperVersion is installed, the cached // SuperVersion can become stale. It will eventually get refreshed either // on the next GetImpl() call or next SuperVersion installation. - sv = cfd->GetAndResetThreadLocalSuperVersion(); - if (!sv || sv->version_number != cfd->GetSuperVersionNumber()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES); + thread_local_sv = cfd->GetThreadLocalSuperVersion(); + void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse); + // Invariant: + // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage + // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage + // should only keep kSVInUse during a GetImpl. + assert(ptr != SuperVersion::kSVInUse); + sv = static_cast(ptr); + if (sv == SuperVersion::kSVObsolete || + sv->version_number != cfd->GetSuperVersionNumber()) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); SuperVersion* sv_to_delete = nullptr; if (sv && sv->Unref()) { @@ -2972,11 +2981,25 @@ Status DBImpl::GetImpl(const ReadOptions& options, mutex_.Unlock(); } - // Release SuperVersion + bool unref_sv = true; if (LIKELY(options_.allow_thread_local)) { // Put the SuperVersion back - cfd->SetThreadLocalSuperVersion(sv); - } else { + void* expected = SuperVersion::kSVInUse; + if (thread_local_sv->CompareAndSwap(static_cast(sv), expected)) { + // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal + // storage has not been altered and no Scrape has happend. The + // SuperVersion is still current. + unref_sv = false; + } else { + // ThreadLocal scrape happened in the process of this GetImpl call (after + // thread local Swap() at the beginning and before CompareAndSwap()). + // This means the SuperVersion it holds is obsolete. + assert(expected == SuperVersion::kSVObsolete); + } + } + + if (unref_sv) { + // Release SuperVersion bool delete_sv = false; if (sv->Unref()) { mutex_.Lock(); @@ -2987,9 +3010,9 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (delete_sv) { delete sv; } + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES); } - // Note, tickers are atomic now - no lock protection needed any more. RecordTick(options_.statistics.get(), NUMBER_KEYS_READ); RecordTick(options_.statistics.get(), BYTES_READ, value->size()); BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 4f323b1b7..9cd5bf47b 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -584,7 +584,7 @@ class LogBuffer { ~LogBuffer(); // Add a log entry to the buffer. - void AddLogToBuffer(const char* format, ...); + void AddLogToBuffer(const char* format, va_list ap); // Flush all buffered log to the info log. void FlushBufferToLog() const; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 82cc7133f..d076f6f76 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -122,7 +122,8 @@ enum Tickers { // Number of table's properties loaded directly from file, without creating // table reader object. NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, - NUMBER_SUPERVERSION_UPDATES, + NUMBER_SUPERVERSION_ACQUIRES, + NUMBER_SUPERVERSION_RELEASES, TICKER_ENUM_MAX }; @@ -178,7 +179,8 @@ const std::vector> TickersNameMap = { {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, "rocksdb.number.direct.load.table.properties"}, - {NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"}, + {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, + {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, }; /** diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 8b675af11..46886291e 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -287,8 +287,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) { size_t PlainTableReader::BucketizeIndexesAndFillBloom( IndexRecordList* record_list, std::vector* hash_to_offsets, - std::vector* bucket_count) { - size_t sub_index_size_needed = 0; + std::vector* entries_per_bucket) { bool first = true; uint32_t prev_hash = 0; size_t num_records = record_list->GetNumRecords(); @@ -306,36 +305,31 @@ size_t PlainTableReader::BucketizeIndexesAndFillBloom( IndexRecord* prev_bucket_head = (*hash_to_offsets)[bucket]; index_record->next = prev_bucket_head; (*hash_to_offsets)[bucket] = index_record; - auto& item_count = (*bucket_count)[bucket]; - if (item_count > 0) { - if (item_count == 1) { - sub_index_size_needed += kOffsetLen + 1; - } - if (item_count == 127) { - // Need more than one byte for length - sub_index_size_needed++; - } - sub_index_size_needed += kOffsetLen; + (*entries_per_bucket)[bucket]++; + } + size_t sub_index_size = 0; + for (auto entry_count : *entries_per_bucket) { + if (entry_count <= 1) { + continue; } - item_count++; + // Only buckets with more than 1 entry will have subindex. + sub_index_size += VarintLength(entry_count); + // total bytes needed to store these entries' in-file offsets. + sub_index_size += entry_count * kOffsetLen; } - return sub_index_size_needed; + return sub_index_size; } void PlainTableReader::FillIndexes( - size_t sub_index_size_needed, + const size_t kSubIndexSize, const std::vector& hash_to_offsets, - const std::vector& bucket_count) { - Log(options_.info_log, "Reserving %zu bytes for sub index", - sub_index_size_needed); - // 8 bytes buffer for variable length size - size_t buffer_size = 8 * 8; - size_t buffer_used = 0; - sub_index_size_needed += buffer_size; - sub_index_.reset(new char[sub_index_size_needed]); + const std::vector& entries_per_bucket) { + Log(options_.info_log, "Reserving %zu bytes for plain table's sub_index", + kSubIndexSize); + sub_index_.reset(new char[kSubIndexSize]); size_t sub_index_offset = 0; for (int i = 0; i < index_size_; i++) { - uint32_t num_keys_for_bucket = bucket_count[i]; + uint32_t num_keys_for_bucket = entries_per_bucket[i]; switch (num_keys_for_bucket) { case 0: // No key for bucket @@ -351,21 +345,6 @@ void PlainTableReader::FillIndexes( char* prev_ptr = &sub_index_[sub_index_offset]; char* cur_ptr = EncodeVarint32(prev_ptr, num_keys_for_bucket); sub_index_offset += (cur_ptr - prev_ptr); - if (cur_ptr - prev_ptr > 2 - || (cur_ptr - prev_ptr == 2 && num_keys_for_bucket <= 127)) { - // Need to resize sub_index. Exponentially grow buffer. - buffer_used += cur_ptr - prev_ptr - 1; - if (buffer_used + 4 > buffer_size) { - Log(options_.info_log, "Recalculate suffix_map length to %zu", - sub_index_size_needed); - - sub_index_size_needed += buffer_size; - buffer_size *= 2; - char* new_sub_index = new char[sub_index_size_needed]; - memcpy(new_sub_index, sub_index_.get(), sub_index_offset); - sub_index_.reset(new_sub_index); - } - } char* sub_index_pos = &sub_index_[sub_index_offset]; IndexRecord* record = hash_to_offsets[i]; int j; @@ -375,12 +354,14 @@ void PlainTableReader::FillIndexes( } assert(j == -1 && record == nullptr); sub_index_offset += kOffsetLen * num_keys_for_bucket; + assert(sub_index_offset <= kSubIndexSize); break; } } + assert(sub_index_offset == kSubIndexSize); Log(options_.info_log, "hash table size: %d, suffix_map length %zu", - index_size_, sub_index_size_needed); + index_size_, kSubIndexSize); } Status PlainTableReader::PopulateIndex() { @@ -422,11 +403,11 @@ Status PlainTableReader::PopulateIndex() { // Bucketize all the index records to a temp data structure, in which for // each bucket, we generate a linked list of IndexRecord, in reversed order. std::vector hash_to_offsets(index_size_, nullptr); - std::vector bucket_count(index_size_, 0); + std::vector entries_per_bucket(index_size_, 0); size_t sub_index_size_needed = BucketizeIndexesAndFillBloom( - &record_list, &hash_to_offsets, &bucket_count); + &record_list, &hash_to_offsets, &entries_per_bucket); // From the temp data structure, populate indexes. - FillIndexes(sub_index_size_needed, hash_to_offsets, bucket_count); + FillIndexes(sub_index_size_needed, hash_to_offsets, entries_per_bucket); return Status::OK(); } diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index 16bbc8ba5..a93b3dd35 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -196,18 +196,18 @@ class PlainTableReader: public TableReader { // containing a linklist of IndexRecord hashed to the same bucket, in reverse // order. // of offsets for the hash, in reversed order. - // bucket_count is sized of index_size_. The value is how many index + // entries_per_bucket is sized of index_size_. The value is how many index // records are there in bucket_headers for the same bucket. - size_t BucketizeIndexesAndFillBloom(IndexRecordList* record_list, - std::vector* bucket_headers, - std::vector* bucket_count); + size_t BucketizeIndexesAndFillBloom( + IndexRecordList* record_list, std::vector* bucket_headers, + std::vector* entries_per_bucket); // Internal helper class to fill the indexes and bloom filters to internal - // data structures. bucket_headers and bucket_count are bucketized indexes - // and counts generated by BucketizeIndexesAndFillBloom(). - void FillIndexes(size_t sub_index_size_needed, + // data structures. bucket_headers and entries_per_bucket are bucketized + // indexes and counts generated by BucketizeIndexesAndFillBloom(). + void FillIndexes(const size_t kSubIndexSize, const std::vector& bucket_headers, - const std::vector& bucket_count); + const std::vector& entries_per_bucket); // Read a plain table key from the position `start`. The read content // will be written to `key` and the size of read bytes will be populated diff --git a/tools/db_sanity_test.cc b/tools/db_sanity_test.cc new file mode 100644 index 000000000..f05d6f7ce --- /dev/null +++ b/tools/db_sanity_test.cc @@ -0,0 +1,201 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include + +#include "include/rocksdb/db.h" +#include "include/rocksdb/options.h" +#include "include/rocksdb/env.h" +#include "include/rocksdb/slice.h" +#include "include/rocksdb/status.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/table.h" +#include "include/rocksdb/slice_transform.h" + +namespace rocksdb { + +class SanityTest { + public: + explicit SanityTest(const std::string& path) + : env_(Env::Default()), path_(path) { + env_->CreateDirIfMissing(path); + } + virtual ~SanityTest() {} + + virtual std::string Name() const = 0; + virtual Options GetOptions() const = 0; + + Status Create() { + Options options = GetOptions(); + options.create_if_missing = true; + std::string dbname = path_ + Name(); + DestroyDB(dbname, options); + DB* db; + Status s = DB::Open(options, dbname, &db); + std::unique_ptr db_guard(db); + if (!s.ok()) { + return s; + } + for (int i = 0; i < 1000000; ++i) { + std::string k = "key" + std::to_string(i); + std::string v = "value" + std::to_string(i); + s = db->Put(WriteOptions(), Slice(k), Slice(v)); + if (!s.ok()) { + return s; + } + } + return Status::OK(); + } + Status Verify() { + DB* db; + std::string dbname = path_ + Name(); + Status s = DB::Open(GetOptions(), dbname, &db); + std::unique_ptr db_guard(db); + if (!s.ok()) { + return s; + } + for (int i = 0; i < 1000000; ++i) { + std::string k = "key" + std::to_string(i); + std::string v = "value" + std::to_string(i); + std::string result; + s = db->Get(ReadOptions(), Slice(k), &result); + if (!s.ok()) { + return s; + } + if (result != v) { + return Status::Corruption("Unexpected value for key " + k); + } + } + return Status::OK(); + } + + private: + Env* env_; + std::string const path_; +}; + +class SanityTestBasic : public SanityTest { + public: + explicit SanityTestBasic(const std::string& path) : SanityTest(path) {} + virtual Options GetOptions() const { + Options options; + options.create_if_missing = true; + return options; + } + virtual std::string Name() const { return "Basic"; } +}; + +class SanityTestSpecialComparator : public SanityTest { + public: + explicit SanityTestSpecialComparator(const std::string& path) + : SanityTest(path) { + options_.comparator = new NewComparator(); + } + ~SanityTestSpecialComparator() { delete options_.comparator; } + virtual Options GetOptions() const { return options_; } + virtual std::string Name() const { return "SpecialComparator"; } + + private: + class NewComparator : public Comparator { + public: + virtual const char* Name() const { return "rocksdb.NewComparator"; } + virtual int Compare(const Slice& a, const Slice& b) const { + return BytewiseComparator()->Compare(a, b); + } + virtual void FindShortestSeparator(std::string* s, const Slice& l) const { + BytewiseComparator()->FindShortestSeparator(s, l); + } + virtual void FindShortSuccessor(std::string* key) const { + BytewiseComparator()->FindShortSuccessor(key); + } + }; + Options options_; +}; + +class SanityTestZlibCompression : public SanityTest { + public: + explicit SanityTestZlibCompression(const std::string& path) + : SanityTest(path) { + options_.compression = kZlibCompression; + } + virtual Options GetOptions() const { return options_; } + virtual std::string Name() const { return "ZlibCompression"; } + + private: + Options options_; +}; + +class SanityTestPlainTableFactory : public SanityTest { + public: + explicit SanityTestPlainTableFactory(const std::string& path) + : SanityTest(path) { + options_.table_factory.reset(NewPlainTableFactory()); + options_.prefix_extractor = NewFixedPrefixTransform(2); + options_.allow_mmap_reads = true; + } + ~SanityTestPlainTableFactory() { delete options_.prefix_extractor; } + virtual Options GetOptions() const { return options_; } + virtual std::string Name() const { return "PlainTable"; } + + private: + Options options_; +}; + +bool RunSanityTests(const std::string& command, const std::string& path) { + std::vector sanity_tests = { + new SanityTestBasic(path), + new SanityTestSpecialComparator(path), + new SanityTestZlibCompression(path), + new SanityTestPlainTableFactory(path)}; + + if (command == "create") { + fprintf(stderr, "Creating...\n"); + } else { + fprintf(stderr, "Verifying...\n"); + } + for (auto sanity_test : sanity_tests) { + Status s; + fprintf(stderr, "%s -- ", sanity_test->Name().c_str()); + if (command == "create") { + s = sanity_test->Create(); + } else { + assert(command == "verify"); + s = sanity_test->Verify(); + } + fprintf(stderr, "%s\n", s.ToString().c_str()); + if (!s.ok()) { + fprintf(stderr, "FAIL\n"); + return false; + } + + delete sanity_test; + } + return true; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + std::string path, command; + bool ok = (argc == 3); + if (ok) { + path = std::string(argv[1]); + command = std::string(argv[2]); + ok = (command == "create" || command == "verify"); + } + if (!ok) { + fprintf(stderr, "Usage: %s [create|verify] \n", argv[0]); + exit(1); + } + if (path.back() != '/') { + path += "/"; + } + + bool sanity_ok = rocksdb::RunSanityTests(command, path); + + return sanity_ok ? 0 : 1; +} diff --git a/util/env.cc b/util/env.cc index 245767acb..573176e6e 100644 --- a/util/env.cc +++ b/util/env.cc @@ -49,7 +49,7 @@ LogBuffer::LogBuffer(const InfoLogLevel log_level, LogBuffer::~LogBuffer() { delete rep_; } -void LogBuffer::AddLogToBuffer(const char* format, ...) { +void LogBuffer::AddLogToBuffer(const char* format, va_list ap) { if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { // Skip the level because of its level. return; @@ -69,10 +69,10 @@ void LogBuffer::AddLogToBuffer(const char* format, ...) { // Print the message if (p < limit) { - va_list ap; - va_start(ap, format); - p += vsnprintf(p, limit - p, format, ap); - va_end(ap); + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); } // Add '\0' to the end @@ -102,7 +102,7 @@ void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { if (log_buffer != nullptr) { va_list ap; va_start(ap, format); - log_buffer->AddLogToBuffer(format); + log_buffer->AddLogToBuffer(format, ap); va_end(ap); } } diff --git a/util/env_posix.cc b/util/env_posix.cc index fcfea28ab..e019d6af0 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -678,10 +678,20 @@ class PosixWritableFile : public WritableFile { Status s; s = Flush(); // flush cache to OS if (!s.ok()) { + return s; } TEST_KILL_RANDOM(rocksdb_kill_odds); + size_t block_size; + size_t last_allocated_block; + GetPreallocationStatus(&block_size, &last_allocated_block); + if (last_allocated_block > 0) { + // trim the extra space preallocated at the end of the file + int dummy __attribute__((unused)); + dummy = ftruncate(fd_, filesize_); // ignore errors + } + if (close(fd_) < 0) { if (s.ok()) { s = IOError(filename_, errno); diff --git a/util/env_test.cc b/util/env_test.cc index eb2829303..a442e3a5c 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -12,6 +12,11 @@ #include #include +#ifdef OS_LINUX +#include +#include +#endif + #include "rocksdb/env.h" #include "port/port.h" #include "util/coding.h" @@ -258,6 +263,41 @@ TEST(EnvPosixTest, RandomAccessUniqueID) { env_->DeleteFile(fname); } +// only works in linux platforms +#ifdef ROCKSDB_FALLOCATE_PRESENT +TEST(EnvPosixTest, AllocateTest) { + std::string fname = GetOnDiskTestDir() + "/preallocate_testfile"; + EnvOptions soptions; + soptions.use_mmap_writes = false; + unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + // allocate 100 MB + size_t kPreallocateSize = 100 * 1024 * 1024; + size_t kBlockSize = 512; + size_t kPageSize = 4096; + std::string data = "test"; + wfile->SetPreallocationBlockSize(kPreallocateSize); + ASSERT_OK(wfile->Append(Slice(data))); + ASSERT_OK(wfile->Flush()); + + struct stat f_stat; + stat(fname.c_str(), &f_stat); + ASSERT_EQ(data.size(), f_stat.st_size); + // verify that blocks are preallocated + ASSERT_EQ(kPreallocateSize / kBlockSize, f_stat.st_blocks); + + // close the file, should deallocate the blocks + wfile.reset(); + + stat(fname.c_str(), &f_stat); + ASSERT_EQ(data.size(), f_stat.st_size); + // verify that preallocated blocks were deallocated on file close + size_t data_blocks_pages = ((data.size() + kPageSize - 1) / kPageSize); + ASSERT_EQ(data_blocks_pages * kPageSize / kBlockSize, f_stat.st_blocks); +} +#endif + // Returns true if any of the strings in ss are the prefix of another string. bool HasPrefix(const std::unordered_set& ss) { for (const std::string& s: ss) { diff --git a/util/thread_local.cc b/util/thread_local.cc index 2e5d3618b..1b4220b8f 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -138,12 +138,25 @@ void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) { return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed); } -void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector* ptrs) { +bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr, + void*& expected) { + auto* tls = GetThreadLocal(); + if (UNLIKELY(id >= tls->entries.size())) { + // Need mutex to protect entries access within ReclaimId + MutexLock l(&mutex_); + tls->entries.resize(id + 1); + } + return tls->entries[id].ptr.compare_exchange_strong(expected, ptr, + std::memory_order_relaxed, std::memory_order_relaxed); +} + +void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector* ptrs, + void* const replacement) { MutexLock l(&mutex_); for (ThreadData* t = head_.next; t != &head_; t = t->next) { if (id < t->entries.size()) { void* ptr = - t->entries[id].ptr.exchange(nullptr, std::memory_order_relaxed); + t->entries[id].ptr.exchange(replacement, std::memory_order_relaxed); if (ptr != nullptr) { ptrs->push_back(ptr); } @@ -225,8 +238,12 @@ void* ThreadLocalPtr::Swap(void* ptr) { return StaticMeta::Instance()->Swap(id_, ptr); } -void ThreadLocalPtr::Scrape(autovector* ptrs) { - StaticMeta::Instance()->Scrape(id_, ptrs); +bool ThreadLocalPtr::CompareAndSwap(void* ptr, void*& expected) { + return StaticMeta::Instance()->CompareAndSwap(id_, ptr, expected); +} + +void ThreadLocalPtr::Scrape(autovector* ptrs, void* const replacement) { + StaticMeta::Instance()->Scrape(id_, ptrs, replacement); } } // namespace rocksdb diff --git a/util/thread_local.h b/util/thread_local.h index 1dfc3cf13..a7728ed64 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -48,9 +48,15 @@ class ThreadLocalPtr { // Atomically swap the supplied ptr and return the previous value void* Swap(void* ptr); - // Return non-nullptr data for all existing threads and reset them - // to nullptr - void Scrape(autovector* ptrs); + // Atomically compare the stored value with expected. Set the new + // pointer value to thread local only if the comparision is true. + // Otherwise, expected returns the stored value. + // Return true on success, false on failure + bool CompareAndSwap(void* ptr, void*& expected); + + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(autovector* ptrs, void* const replacement); protected: struct Entry { @@ -100,8 +106,12 @@ class ThreadLocalPtr { void Reset(uint32_t id, void* ptr); // Atomically swap the supplied ptr and return the previous value void* Swap(uint32_t id, void* ptr); - // Return data for all existing threads and return them to nullptr - void Scrape(uint32_t id, autovector* ptrs); + // Atomically compare and swap the provided value only if it equals + // to expected value. + bool CompareAndSwap(uint32_t id, void* ptr, void*& expected); + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(uint32_t id, autovector* ptrs, void* const replacement); // Register the UnrefHandler for id void SetHandler(uint32_t id, UnrefHandler handler); diff --git a/util/thread_local_test.cc b/util/thread_local_test.cc index 96e35d959..b1a865d5e 100644 --- a/util/thread_local_test.cc +++ b/util/thread_local_test.cc @@ -435,8 +435,8 @@ TEST(ThreadLocalTest, Scrape) { // Scrape all thread local data. No unref at thread // exit or ThreadLocalPtr destruction autovector ptrs; - p.tls1.Scrape(&ptrs); - p.tls2->Scrape(&ptrs); + p.tls1.Scrape(&ptrs, nullptr); + p.tls2->Scrape(&ptrs, nullptr); delete p.tls2; // Signal to exit mu.Lock(); @@ -449,6 +449,22 @@ TEST(ThreadLocalTest, Scrape) { } } +TEST(ThreadLocalTest, CompareAndSwap) { + ThreadLocalPtr tls; + ASSERT_TRUE(tls.Swap(reinterpret_cast(1)) == nullptr); + void* expected = reinterpret_cast(1); + // Swap in 2 + ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast(2), expected)); + expected = reinterpret_cast(100); + // Fail Swap, still 2 + ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast(2), expected)); + ASSERT_EQ(expected, reinterpret_cast(2)); + // Swap in 3 + expected = reinterpret_cast(2); + ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast(3), expected)); + ASSERT_EQ(tls.Get(), reinterpret_cast(3)); +} + } // namespace rocksdb int main(int argc, char** argv) {