Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_impl.h
main
Igor Canadi 11 years ago
commit 1e0d47276c
  1. 3
      Makefile
  2. 12
      db/column_family.cc
  3. 16
      db/column_family.h
  4. 45
      db/db_impl.cc
  5. 2
      include/rocksdb/env.h
  6. 6
      include/rocksdb/statistics.h
  7. 67
      table/plain_table_reader.cc
  8. 16
      table/plain_table_reader.h
  9. 201
      tools/db_sanity_test.cc
  10. 12
      util/env.cc
  11. 10
      util/env_posix.cc
  12. 40
      util/env_test.cc
  13. 25
      util/thread_local.cc
  14. 20
      util/thread_local.h
  15. 20
      util/thread_local_test.cc

@ -235,6 +235,9 @@ block_hash_index_test: table/block_hash_index_test.o $(LIBOBJECTS) $(TESTHARNESS
db_stress: tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) tools/db_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
db_sanity_test: tools/db_sanity_test.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) tools/db_sanity_test.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
db_repl_stress: tools/db_repl_stress.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) tools/db_repl_stress.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

@ -111,6 +111,9 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
return result;
}
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
@ -153,6 +156,10 @@ void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
namespace {
void SuperVersionUnrefHandle(void* ptr) {
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
// destroyed. When former happens, the thread shouldn't see kSVInUse.
// When latter happens, we are in ~ColumnFamilyData(), no get should happen as
// well.
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->db_mutex->Lock();
@ -299,9 +306,12 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
autovector<void*> sv_ptrs;
local_sv_->Scrape(&sv_ptrs);
local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
for (auto ptr : sv_ptrs) {
assert(ptr);
if (ptr == SuperVersion::kSVInUse) {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->Cleanup();

@ -92,6 +92,15 @@ struct SuperVersion {
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
// by thread. This way, the value of kSVInUse is guaranteed to have no
// conflict with SuperVersion object address and portable on different
// platform.
static int dummy;
static void* const kSVInUse;
static void* const kSVObsolete;
};
extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
@ -163,12 +172,7 @@ class ColumnFamilyData {
}
SuperVersion* GetSuperVersion() const { return super_version_; }
SuperVersion* GetAndResetThreadLocalSuperVersion() const {
return static_cast<SuperVersion*>(local_sv_->Swap(nullptr));
}
void SetThreadLocalSuperVersion(SuperVersion* super_version) {
local_sv_->Reset(static_cast<void*>(super_version));
}
ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); }
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
}

@ -1263,10 +1263,6 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
if (s.ok()) {
InstallSuperVersion(cfd, deletion_state);
// Reset SuperVersions cached in thread local storage
if (options_.allow_thread_local) {
cfd->ResetThreadLocalSuperVersions();
}
if (madeProgress) {
*madeProgress = 1;
}
@ -2876,6 +2872,10 @@ void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
cfd->InstallSuperVersion(new_superversion, &mutex_);
deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion);
// Reset SuperVersions cached in thread local storage
if (options_.allow_thread_local) {
cfd->ResetThreadLocalSuperVersions();
}
}
Status DBImpl::GetImpl(const ReadOptions& options,
@ -2897,6 +2897,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Acquire SuperVersion
SuperVersion* sv = nullptr;
ThreadLocalPtr* thread_local_sv = nullptr;
if (LIKELY(options_.allow_thread_local)) {
// The SuperVersion is cached in thread local storage to avoid acquiring
// mutex when SuperVersion does not change since the last use. When a new
@ -2907,9 +2908,17 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. It will eventually get refreshed either
// on the next GetImpl() call or next SuperVersion installation.
sv = cfd->GetAndResetThreadLocalSuperVersion();
if (!sv || sv->version_number != cfd->GetSuperVersionNumber()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES);
thread_local_sv = cfd->GetThreadLocalSuperVersion();
void* ptr = thread_local_sv->Swap(SuperVersion::kSVInUse);
// Invariant:
// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse during a GetImpl.
assert(ptr != SuperVersion::kSVInUse);
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != cfd->GetSuperVersionNumber()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
@ -2972,11 +2981,25 @@ Status DBImpl::GetImpl(const ReadOptions& options,
mutex_.Unlock();
}
// Release SuperVersion
bool unref_sv = true;
if (LIKELY(options_.allow_thread_local)) {
// Put the SuperVersion back
cfd->SetThreadLocalSuperVersion(sv);
} else {
void* expected = SuperVersion::kSVInUse;
if (thread_local_sv->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happend. The
// SuperVersion is still current.
unref_sv = false;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
assert(expected == SuperVersion::kSVObsolete);
}
}
if (unref_sv) {
// Release SuperVersion
bool delete_sv = false;
if (sv->Unref()) {
mutex_.Lock();
@ -2987,9 +3010,9 @@ Status DBImpl::GetImpl(const ReadOptions& options,
if (delete_sv) {
delete sv;
}
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES);
}
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
RecordTick(options_.statistics.get(), BYTES_READ, value->size());
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);

@ -584,7 +584,7 @@ class LogBuffer {
~LogBuffer();
// Add a log entry to the buffer.
void AddLogToBuffer(const char* format, ...);
void AddLogToBuffer(const char* format, va_list ap);
// Flush all buffered log to the info log.
void FlushBufferToLog() const;

@ -122,7 +122,8 @@ enum Tickers {
// Number of table's properties loaded directly from file, without creating
// table reader object.
NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
NUMBER_SUPERVERSION_UPDATES,
NUMBER_SUPERVERSION_ACQUIRES,
NUMBER_SUPERVERSION_RELEASES,
TICKER_ENUM_MAX
};
@ -178,7 +179,8 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"},
{NUMBER_DIRECT_LOAD_TABLE_PROPERTIES,
"rocksdb.number.direct.load.table.properties"},
{NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"},
{NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"},
{NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"},
};
/**

@ -287,8 +287,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) {
size_t PlainTableReader::BucketizeIndexesAndFillBloom(
IndexRecordList* record_list, std::vector<IndexRecord*>* hash_to_offsets,
std::vector<uint32_t>* bucket_count) {
size_t sub_index_size_needed = 0;
std::vector<uint32_t>* entries_per_bucket) {
bool first = true;
uint32_t prev_hash = 0;
size_t num_records = record_list->GetNumRecords();
@ -306,36 +305,31 @@ size_t PlainTableReader::BucketizeIndexesAndFillBloom(
IndexRecord* prev_bucket_head = (*hash_to_offsets)[bucket];
index_record->next = prev_bucket_head;
(*hash_to_offsets)[bucket] = index_record;
auto& item_count = (*bucket_count)[bucket];
if (item_count > 0) {
if (item_count == 1) {
sub_index_size_needed += kOffsetLen + 1;
}
if (item_count == 127) {
// Need more than one byte for length
sub_index_size_needed++;
}
sub_index_size_needed += kOffsetLen;
(*entries_per_bucket)[bucket]++;
}
size_t sub_index_size = 0;
for (auto entry_count : *entries_per_bucket) {
if (entry_count <= 1) {
continue;
}
item_count++;
// Only buckets with more than 1 entry will have subindex.
sub_index_size += VarintLength(entry_count);
// total bytes needed to store these entries' in-file offsets.
sub_index_size += entry_count * kOffsetLen;
}
return sub_index_size_needed;
return sub_index_size;
}
void PlainTableReader::FillIndexes(
size_t sub_index_size_needed,
const size_t kSubIndexSize,
const std::vector<IndexRecord*>& hash_to_offsets,
const std::vector<uint32_t>& bucket_count) {
Log(options_.info_log, "Reserving %zu bytes for sub index",
sub_index_size_needed);
// 8 bytes buffer for variable length size
size_t buffer_size = 8 * 8;
size_t buffer_used = 0;
sub_index_size_needed += buffer_size;
sub_index_.reset(new char[sub_index_size_needed]);
const std::vector<uint32_t>& entries_per_bucket) {
Log(options_.info_log, "Reserving %zu bytes for plain table's sub_index",
kSubIndexSize);
sub_index_.reset(new char[kSubIndexSize]);
size_t sub_index_offset = 0;
for (int i = 0; i < index_size_; i++) {
uint32_t num_keys_for_bucket = bucket_count[i];
uint32_t num_keys_for_bucket = entries_per_bucket[i];
switch (num_keys_for_bucket) {
case 0:
// No key for bucket
@ -351,21 +345,6 @@ void PlainTableReader::FillIndexes(
char* prev_ptr = &sub_index_[sub_index_offset];
char* cur_ptr = EncodeVarint32(prev_ptr, num_keys_for_bucket);
sub_index_offset += (cur_ptr - prev_ptr);
if (cur_ptr - prev_ptr > 2
|| (cur_ptr - prev_ptr == 2 && num_keys_for_bucket <= 127)) {
// Need to resize sub_index. Exponentially grow buffer.
buffer_used += cur_ptr - prev_ptr - 1;
if (buffer_used + 4 > buffer_size) {
Log(options_.info_log, "Recalculate suffix_map length to %zu",
sub_index_size_needed);
sub_index_size_needed += buffer_size;
buffer_size *= 2;
char* new_sub_index = new char[sub_index_size_needed];
memcpy(new_sub_index, sub_index_.get(), sub_index_offset);
sub_index_.reset(new_sub_index);
}
}
char* sub_index_pos = &sub_index_[sub_index_offset];
IndexRecord* record = hash_to_offsets[i];
int j;
@ -375,12 +354,14 @@ void PlainTableReader::FillIndexes(
}
assert(j == -1 && record == nullptr);
sub_index_offset += kOffsetLen * num_keys_for_bucket;
assert(sub_index_offset <= kSubIndexSize);
break;
}
}
assert(sub_index_offset == kSubIndexSize);
Log(options_.info_log, "hash table size: %d, suffix_map length %zu",
index_size_, sub_index_size_needed);
index_size_, kSubIndexSize);
}
Status PlainTableReader::PopulateIndex() {
@ -422,11 +403,11 @@ Status PlainTableReader::PopulateIndex() {
// Bucketize all the index records to a temp data structure, in which for
// each bucket, we generate a linked list of IndexRecord, in reversed order.
std::vector<IndexRecord*> hash_to_offsets(index_size_, nullptr);
std::vector<uint32_t> bucket_count(index_size_, 0);
std::vector<uint32_t> entries_per_bucket(index_size_, 0);
size_t sub_index_size_needed = BucketizeIndexesAndFillBloom(
&record_list, &hash_to_offsets, &bucket_count);
&record_list, &hash_to_offsets, &entries_per_bucket);
// From the temp data structure, populate indexes.
FillIndexes(sub_index_size_needed, hash_to_offsets, bucket_count);
FillIndexes(sub_index_size_needed, hash_to_offsets, entries_per_bucket);
return Status::OK();
}

@ -196,18 +196,18 @@ class PlainTableReader: public TableReader {
// containing a linklist of IndexRecord hashed to the same bucket, in reverse
// order.
// of offsets for the hash, in reversed order.
// bucket_count is sized of index_size_. The value is how many index
// entries_per_bucket is sized of index_size_. The value is how many index
// records are there in bucket_headers for the same bucket.
size_t BucketizeIndexesAndFillBloom(IndexRecordList* record_list,
std::vector<IndexRecord*>* bucket_headers,
std::vector<uint32_t>* bucket_count);
size_t BucketizeIndexesAndFillBloom(
IndexRecordList* record_list, std::vector<IndexRecord*>* bucket_headers,
std::vector<uint32_t>* entries_per_bucket);
// Internal helper class to fill the indexes and bloom filters to internal
// data structures. bucket_headers and bucket_count are bucketized indexes
// and counts generated by BucketizeIndexesAndFillBloom().
void FillIndexes(size_t sub_index_size_needed,
// data structures. bucket_headers and entries_per_bucket are bucketized
// indexes and counts generated by BucketizeIndexesAndFillBloom().
void FillIndexes(const size_t kSubIndexSize,
const std::vector<IndexRecord*>& bucket_headers,
const std::vector<uint32_t>& bucket_count);
const std::vector<uint32_t>& entries_per_bucket);
// Read a plain table key from the position `start`. The read content
// will be written to `key` and the size of read bytes will be populated

@ -0,0 +1,201 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <cstdio>
#include <vector>
#include <memory>
#include "include/rocksdb/db.h"
#include "include/rocksdb/options.h"
#include "include/rocksdb/env.h"
#include "include/rocksdb/slice.h"
#include "include/rocksdb/status.h"
#include "include/rocksdb/comparator.h"
#include "include/rocksdb/table.h"
#include "include/rocksdb/slice_transform.h"
namespace rocksdb {
class SanityTest {
public:
explicit SanityTest(const std::string& path)
: env_(Env::Default()), path_(path) {
env_->CreateDirIfMissing(path);
}
virtual ~SanityTest() {}
virtual std::string Name() const = 0;
virtual Options GetOptions() const = 0;
Status Create() {
Options options = GetOptions();
options.create_if_missing = true;
std::string dbname = path_ + Name();
DestroyDB(dbname, options);
DB* db;
Status s = DB::Open(options, dbname, &db);
std::unique_ptr<DB> db_guard(db);
if (!s.ok()) {
return s;
}
for (int i = 0; i < 1000000; ++i) {
std::string k = "key" + std::to_string(i);
std::string v = "value" + std::to_string(i);
s = db->Put(WriteOptions(), Slice(k), Slice(v));
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status Verify() {
DB* db;
std::string dbname = path_ + Name();
Status s = DB::Open(GetOptions(), dbname, &db);
std::unique_ptr<DB> db_guard(db);
if (!s.ok()) {
return s;
}
for (int i = 0; i < 1000000; ++i) {
std::string k = "key" + std::to_string(i);
std::string v = "value" + std::to_string(i);
std::string result;
s = db->Get(ReadOptions(), Slice(k), &result);
if (!s.ok()) {
return s;
}
if (result != v) {
return Status::Corruption("Unexpected value for key " + k);
}
}
return Status::OK();
}
private:
Env* env_;
std::string const path_;
};
class SanityTestBasic : public SanityTest {
public:
explicit SanityTestBasic(const std::string& path) : SanityTest(path) {}
virtual Options GetOptions() const {
Options options;
options.create_if_missing = true;
return options;
}
virtual std::string Name() const { return "Basic"; }
};
class SanityTestSpecialComparator : public SanityTest {
public:
explicit SanityTestSpecialComparator(const std::string& path)
: SanityTest(path) {
options_.comparator = new NewComparator();
}
~SanityTestSpecialComparator() { delete options_.comparator; }
virtual Options GetOptions() const { return options_; }
virtual std::string Name() const { return "SpecialComparator"; }
private:
class NewComparator : public Comparator {
public:
virtual const char* Name() const { return "rocksdb.NewComparator"; }
virtual int Compare(const Slice& a, const Slice& b) const {
return BytewiseComparator()->Compare(a, b);
}
virtual void FindShortestSeparator(std::string* s, const Slice& l) const {
BytewiseComparator()->FindShortestSeparator(s, l);
}
virtual void FindShortSuccessor(std::string* key) const {
BytewiseComparator()->FindShortSuccessor(key);
}
};
Options options_;
};
class SanityTestZlibCompression : public SanityTest {
public:
explicit SanityTestZlibCompression(const std::string& path)
: SanityTest(path) {
options_.compression = kZlibCompression;
}
virtual Options GetOptions() const { return options_; }
virtual std::string Name() const { return "ZlibCompression"; }
private:
Options options_;
};
class SanityTestPlainTableFactory : public SanityTest {
public:
explicit SanityTestPlainTableFactory(const std::string& path)
: SanityTest(path) {
options_.table_factory.reset(NewPlainTableFactory());
options_.prefix_extractor = NewFixedPrefixTransform(2);
options_.allow_mmap_reads = true;
}
~SanityTestPlainTableFactory() { delete options_.prefix_extractor; }
virtual Options GetOptions() const { return options_; }
virtual std::string Name() const { return "PlainTable"; }
private:
Options options_;
};
bool RunSanityTests(const std::string& command, const std::string& path) {
std::vector<SanityTest*> sanity_tests = {
new SanityTestBasic(path),
new SanityTestSpecialComparator(path),
new SanityTestZlibCompression(path),
new SanityTestPlainTableFactory(path)};
if (command == "create") {
fprintf(stderr, "Creating...\n");
} else {
fprintf(stderr, "Verifying...\n");
}
for (auto sanity_test : sanity_tests) {
Status s;
fprintf(stderr, "%s -- ", sanity_test->Name().c_str());
if (command == "create") {
s = sanity_test->Create();
} else {
assert(command == "verify");
s = sanity_test->Verify();
}
fprintf(stderr, "%s\n", s.ToString().c_str());
if (!s.ok()) {
fprintf(stderr, "FAIL\n");
return false;
}
delete sanity_test;
}
return true;
}
} // namespace rocksdb
int main(int argc, char** argv) {
std::string path, command;
bool ok = (argc == 3);
if (ok) {
path = std::string(argv[1]);
command = std::string(argv[2]);
ok = (command == "create" || command == "verify");
}
if (!ok) {
fprintf(stderr, "Usage: %s <path> [create|verify] \n", argv[0]);
exit(1);
}
if (path.back() != '/') {
path += "/";
}
bool sanity_ok = rocksdb::RunSanityTests(command, path);
return sanity_ok ? 0 : 1;
}

@ -49,7 +49,7 @@ LogBuffer::LogBuffer(const InfoLogLevel log_level,
LogBuffer::~LogBuffer() { delete rep_; }
void LogBuffer::AddLogToBuffer(const char* format, ...) {
void LogBuffer::AddLogToBuffer(const char* format, va_list ap) {
if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) {
// Skip the level because of its level.
return;
@ -69,10 +69,10 @@ void LogBuffer::AddLogToBuffer(const char* format, ...) {
// Print the message
if (p < limit) {
va_list ap;
va_start(ap, format);
p += vsnprintf(p, limit - p, format, ap);
va_end(ap);
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Add '\0' to the end
@ -102,7 +102,7 @@ void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) {
if (log_buffer != nullptr) {
va_list ap;
va_start(ap, format);
log_buffer->AddLogToBuffer(format);
log_buffer->AddLogToBuffer(format, ap);
va_end(ap);
}
}

@ -678,10 +678,20 @@ class PosixWritableFile : public WritableFile {
Status s;
s = Flush(); // flush cache to OS
if (!s.ok()) {
return s;
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
size_t block_size;
size_t last_allocated_block;
GetPreallocationStatus(&block_size, &last_allocated_block);
if (last_allocated_block > 0) {
// trim the extra space preallocated at the end of the file
int dummy __attribute__((unused));
dummy = ftruncate(fd_, filesize_); // ignore errors
}
if (close(fd_) < 0) {
if (s.ok()) {
s = IOError(filename_, errno);

@ -12,6 +12,11 @@
#include <iostream>
#include <unordered_set>
#ifdef OS_LINUX
#include <sys/stat.h>
#include <unistd.h>
#endif
#include "rocksdb/env.h"
#include "port/port.h"
#include "util/coding.h"
@ -258,6 +263,41 @@ TEST(EnvPosixTest, RandomAccessUniqueID) {
env_->DeleteFile(fname);
}
// only works in linux platforms
#ifdef ROCKSDB_FALLOCATE_PRESENT
TEST(EnvPosixTest, AllocateTest) {
std::string fname = GetOnDiskTestDir() + "/preallocate_testfile";
EnvOptions soptions;
soptions.use_mmap_writes = false;
unique_ptr<WritableFile> wfile;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
// allocate 100 MB
size_t kPreallocateSize = 100 * 1024 * 1024;
size_t kBlockSize = 512;
size_t kPageSize = 4096;
std::string data = "test";
wfile->SetPreallocationBlockSize(kPreallocateSize);
ASSERT_OK(wfile->Append(Slice(data)));
ASSERT_OK(wfile->Flush());
struct stat f_stat;
stat(fname.c_str(), &f_stat);
ASSERT_EQ(data.size(), f_stat.st_size);
// verify that blocks are preallocated
ASSERT_EQ(kPreallocateSize / kBlockSize, f_stat.st_blocks);
// close the file, should deallocate the blocks
wfile.reset();
stat(fname.c_str(), &f_stat);
ASSERT_EQ(data.size(), f_stat.st_size);
// verify that preallocated blocks were deallocated on file close
size_t data_blocks_pages = ((data.size() + kPageSize - 1) / kPageSize);
ASSERT_EQ(data_blocks_pages * kPageSize / kBlockSize, f_stat.st_blocks);
}
#endif
// Returns true if any of the strings in ss are the prefix of another string.
bool HasPrefix(const std::unordered_set<std::string>& ss) {
for (const std::string& s: ss) {

@ -138,12 +138,25 @@ void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) {
return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed);
}
void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs) {
bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr,
void*& expected) {
auto* tls = GetThreadLocal();
if (UNLIKELY(id >= tls->entries.size())) {
// Need mutex to protect entries access within ReclaimId
MutexLock l(&mutex_);
tls->entries.resize(id + 1);
}
return tls->entries[id].ptr.compare_exchange_strong(expected, ptr,
std::memory_order_relaxed, std::memory_order_relaxed);
}
void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs,
void* const replacement) {
MutexLock l(&mutex_);
for (ThreadData* t = head_.next; t != &head_; t = t->next) {
if (id < t->entries.size()) {
void* ptr =
t->entries[id].ptr.exchange(nullptr, std::memory_order_relaxed);
t->entries[id].ptr.exchange(replacement, std::memory_order_relaxed);
if (ptr != nullptr) {
ptrs->push_back(ptr);
}
@ -225,8 +238,12 @@ void* ThreadLocalPtr::Swap(void* ptr) {
return StaticMeta::Instance()->Swap(id_, ptr);
}
void ThreadLocalPtr::Scrape(autovector<void*>* ptrs) {
StaticMeta::Instance()->Scrape(id_, ptrs);
bool ThreadLocalPtr::CompareAndSwap(void* ptr, void*& expected) {
return StaticMeta::Instance()->CompareAndSwap(id_, ptr, expected);
}
void ThreadLocalPtr::Scrape(autovector<void*>* ptrs, void* const replacement) {
StaticMeta::Instance()->Scrape(id_, ptrs, replacement);
}
} // namespace rocksdb

@ -48,9 +48,15 @@ class ThreadLocalPtr {
// Atomically swap the supplied ptr and return the previous value
void* Swap(void* ptr);
// Return non-nullptr data for all existing threads and reset them
// to nullptr
void Scrape(autovector<void*>* ptrs);
// Atomically compare the stored value with expected. Set the new
// pointer value to thread local only if the comparision is true.
// Otherwise, expected returns the stored value.
// Return true on success, false on failure
bool CompareAndSwap(void* ptr, void*& expected);
// Reset all thread local data to replacement, and return non-nullptr
// data for all existing threads
void Scrape(autovector<void*>* ptrs, void* const replacement);
protected:
struct Entry {
@ -100,8 +106,12 @@ class ThreadLocalPtr {
void Reset(uint32_t id, void* ptr);
// Atomically swap the supplied ptr and return the previous value
void* Swap(uint32_t id, void* ptr);
// Return data for all existing threads and return them to nullptr
void Scrape(uint32_t id, autovector<void*>* ptrs);
// Atomically compare and swap the provided value only if it equals
// to expected value.
bool CompareAndSwap(uint32_t id, void* ptr, void*& expected);
// Reset all thread local data to replacement, and return non-nullptr
// data for all existing threads
void Scrape(uint32_t id, autovector<void*>* ptrs, void* const replacement);
// Register the UnrefHandler for id
void SetHandler(uint32_t id, UnrefHandler handler);

@ -435,8 +435,8 @@ TEST(ThreadLocalTest, Scrape) {
// Scrape all thread local data. No unref at thread
// exit or ThreadLocalPtr destruction
autovector<void*> ptrs;
p.tls1.Scrape(&ptrs);
p.tls2->Scrape(&ptrs);
p.tls1.Scrape(&ptrs, nullptr);
p.tls2->Scrape(&ptrs, nullptr);
delete p.tls2;
// Signal to exit
mu.Lock();
@ -449,6 +449,22 @@ TEST(ThreadLocalTest, Scrape) {
}
}
TEST(ThreadLocalTest, CompareAndSwap) {
ThreadLocalPtr tls;
ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(1)) == nullptr);
void* expected = reinterpret_cast<void*>(1);
// Swap in 2
ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
expected = reinterpret_cast<void*>(100);
// Fail Swap, still 2
ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
ASSERT_EQ(expected, reinterpret_cast<void*>(2));
// Swap in 3
expected = reinterpret_cast<void*>(2);
ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(3), expected));
ASSERT_EQ(tls.Get(), reinterpret_cast<void*>(3));
}
} // namespace rocksdb
int main(int argc, char** argv) {

Loading…
Cancel
Save