diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index c026782f6..92839ad4f 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -188,15 +188,6 @@ if [ "$CROSS_COMPILE" = "true" -o "$FBCODE_BUILD" = "true" ]; then # Also don't need any compilation tests if compiling on fbcode true else - # If -std=c++0x works, use . Otherwise use port_posix.h. - $CXX $CFLAGS -std=c++0x -x c++ - -o /dev/null 2>/dev/null < - int main() {} -EOF - if [ "$?" = 0 ]; then - COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_ATOMIC_PRESENT" - fi - # Test whether fallocate is available $CXX $CFLAGS -x c++ - -o /dev/null 2>/dev/null < diff --git a/build_tools/fbcode.gcc471.sh b/build_tools/fbcode.gcc471.sh index c971cda5b..b5d886730 100644 --- a/build_tools/fbcode.gcc471.sh +++ b/build_tools/fbcode.gcc471.sh @@ -54,7 +54,7 @@ RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic" CFLAGS+=" -I $TOOLCHAIN_LIB_BASE/jemalloc/$TOOL_JEMALLOC/include -DHAVE_JEMALLOC" CFLAGS+=" $LIBGCC_INCLUDE $GLIBC_INCLUDE" -CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT" +CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT" CFLAGS+=" -DSNAPPY -DGFLAGS=google -DZLIB -DBZIP2" EXEC_LDFLAGS=" -Wl,--whole-archive $TOOLCHAIN_LIB_BASE/jemalloc/$TOOL_JEMALLOC/lib/libjemalloc.a" diff --git a/build_tools/fbcode.gcc481.sh b/build_tools/fbcode.gcc481.sh index 5426e3f9a..386ad509b 100644 --- a/build_tools/fbcode.gcc481.sh +++ b/build_tools/fbcode.gcc481.sh @@ -70,7 +70,7 @@ RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic" CFLAGS+=" $LIBGCC_INCLUDE $GLIBC_INCLUDE" -CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT" +CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT" CFLAGS+=" -DSNAPPY -DGFLAGS=google -DZLIB -DBZIP2 -DLZ4 -DNUMA" EXEC_LDFLAGS="-Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib/ld.so" diff --git a/db/db_bench.cc b/db/db_bench.cc index f0fe5e02e..1a379e948 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1533,13 +1533,13 @@ class Benchmark { void AcquireLoad(ThreadState* thread) { int dummy; - port::AtomicPointer ap(&dummy); + std::atomic ap; int count = 0; void *ptr = nullptr; thread->stats.AddMessage("(each op is 1000 loads)"); while (count < 100000) { for (int i = 0; i < 1000; i++) { - ptr = ap.Acquire_Load(); + ptr = ap.load(std::memory_order_acquire); } count++; thread->stats.FinishedOps(nullptr, nullptr, 1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 40b94acab..dc5fc2394 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -326,7 +326,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) stats_(db_options_.statistics.get()), db_lock_(nullptr), mutex_(options.use_adaptive_mutex), - shutting_down_(nullptr), + shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), log_empty_(true), @@ -388,7 +388,7 @@ DBImpl::~DBImpl() { } // Wait for background work to finish - shutting_down_.Release_Store(this); // Any non-nullptr value is ok + shutting_down_.store(true, std::memory_order_release); while (bg_compaction_scheduled_ || bg_flush_scheduled_) { bg_cv_.Wait(); } @@ -1615,7 +1615,8 @@ Status DBImpl::FlushMemTableToOutputFile( Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit, &file_number, log_buffer); - if (s.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { + if (s.ok() && + (shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) { s = Status::ShutdownInProgress( "Database shutdown or Column family drop during flush"); } @@ -2014,7 +2015,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bg_schedule_needed_ = false; if (bg_work_gate_closed_) { // gate closed for backgrond work - } else if (shutting_down_.Acquire_Load()) { + } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } else { bool is_flush_pending = false; @@ -2129,7 +2130,7 @@ void DBImpl::BackgroundCallFlush() { MutexLock l(&mutex_); Status s; - if (!shutting_down_.Acquire_Load()) { + if (!shutting_down_.load(std::memory_order_acquire)) { s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer); if (!s.ok()) { // Wait a little bit before retrying background compaction in @@ -2196,7 +2197,7 @@ void DBImpl::BackgroundCallCompaction() { MutexLock l(&mutex_); assert(bg_compaction_scheduled_); Status s; - if (!shutting_down_.Acquire_Load()) { + if (!shutting_down_.load(std::memory_order_acquire)) { s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer); if (!s.ok()) { // Wait a little bit before retrying background compaction in @@ -2700,7 +2701,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, // flush thread will take care of this return 0; } - if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) { + if (cfd->imm()->imm_flush_needed.load(std::memory_order_relaxed)) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); if (cfd->imm()->IsFlushPending()) { @@ -2762,7 +2763,7 @@ Status DBImpl::ProcessKeyValueCompaction( int64_t key_drop_newer_entry = 0; int64_t key_drop_obsolete = 0; int64_t loop_cnt = 0; - while (input->Valid() && !shutting_down_.Acquire_Load() && + while (input->Valid() && !shutting_down_.load(std::memory_order_acquire) && !cfd->IsDropped()) { if (++loop_cnt > 1000) { if (key_drop_user > 0) { @@ -3222,7 +3223,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, shared_ptr backup_input( versions_->MakeInputIterator(compact->compaction)); backup_input->SeekToFirst(); - while (backup_input->Valid() && !shutting_down_.Acquire_Load() && + while (backup_input->Valid() && + !shutting_down_.load(std::memory_order_acquire) && !cfd->IsDropped()) { // FLUSH preempts compaction // TODO(icanadi) this currently only checks if flush is necessary on @@ -3356,7 +3358,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, log_buffer); } // checking for compaction filter v2 - if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) { + if (status.ok() && + (shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) { status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } diff --git a/db/db_impl.h b/db/db_impl.h index 2d5cfe6c2..f730d6ba4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -488,7 +488,7 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; - port::AtomicPointer shutting_down_; + std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 // * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't diff --git a/db/db_test.cc b/db/db_test.cc index cfd9dcd9b..3ded0ec97 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -121,25 +121,25 @@ static std::string Key(int i) { class SpecialEnv : public EnvWrapper { public: // sstable Sync() calls are blocked while this pointer is non-nullptr. - port::AtomicPointer delay_sstable_sync_; + std::atomic delay_sstable_sync_; // Drop writes on the floor while this pointer is non-nullptr. - port::AtomicPointer drop_writes_; + std::atomic drop_writes_; // Simulate no-space errors while this pointer is non-nullptr. - port::AtomicPointer no_space_; + std::atomic no_space_; // Simulate non-writable file system while this pointer is non-nullptr - port::AtomicPointer non_writable_; + std::atomic non_writable_; // Force sync of manifest files to fail while this pointer is non-nullptr - port::AtomicPointer manifest_sync_error_; + std::atomic manifest_sync_error_; // Force write to manifest files to fail while this pointer is non-nullptr - port::AtomicPointer manifest_write_error_; + std::atomic manifest_write_error_; // Force write to log files to fail while this pointer is non-nullptr - port::AtomicPointer log_write_error_; + std::atomic log_write_error_; bool count_random_reads_; anon::AtomicCounter random_read_counter_; @@ -154,15 +154,15 @@ class SpecialEnv : public EnvWrapper { std::atomic sync_counter_; explicit SpecialEnv(Env* base) : EnvWrapper(base) { - delay_sstable_sync_.Release_Store(nullptr); - drop_writes_.Release_Store(nullptr); - no_space_.Release_Store(nullptr); - non_writable_.Release_Store(nullptr); + delay_sstable_sync_.store(false, std::memory_order_release); + drop_writes_.store(false, std::memory_order_release); + no_space_.store(false, std::memory_order_release); + non_writable_.store(false, std::memory_order_release); count_random_reads_ = false; count_sequential_reads_ = false; - manifest_sync_error_.Release_Store(nullptr); - manifest_write_error_.Release_Store(nullptr); - log_write_error_.Release_Store(nullptr); + manifest_sync_error_.store(false, std::memory_order_release); + manifest_write_error_.store(false, std::memory_order_release); + log_write_error_.store(false, std::memory_order_release); bytes_written_ = 0; sync_counter_ = 0; } @@ -180,10 +180,10 @@ class SpecialEnv : public EnvWrapper { base_(std::move(base)) { } Status Append(const Slice& data) { - if (env_->drop_writes_.Acquire_Load() != nullptr) { + if (env_->drop_writes_.load(std::memory_order_acquire)) { // Drop writes on the floor return Status::OK(); - } else if (env_->no_space_.Acquire_Load() != nullptr) { + } else if (env_->no_space_.load(std::memory_order_acquire)) { return Status::IOError("No space left on device"); } else { env_->bytes_written_ += data.size(); @@ -194,7 +194,7 @@ class SpecialEnv : public EnvWrapper { Status Flush() { return base_->Flush(); } Status Sync() { ++env_->sync_counter_; - while (env_->delay_sstable_sync_.Acquire_Load() != nullptr) { + while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) { env_->SleepForMicroseconds(100000); } return base_->Sync(); @@ -211,7 +211,7 @@ class SpecialEnv : public EnvWrapper { ManifestFile(SpecialEnv* env, unique_ptr&& b) : env_(env), base_(std::move(b)) { } Status Append(const Slice& data) { - if (env_->manifest_write_error_.Acquire_Load() != nullptr) { + if (env_->manifest_write_error_.load(std::memory_order_acquire)) { return Status::IOError("simulated writer error"); } else { return base_->Append(data); @@ -221,7 +221,7 @@ class SpecialEnv : public EnvWrapper { Status Flush() { return base_->Flush(); } Status Sync() { ++env_->sync_counter_; - if (env_->manifest_sync_error_.Acquire_Load() != nullptr) { + if (env_->manifest_sync_error_.load(std::memory_order_acquire)) { return Status::IOError("simulated sync error"); } else { return base_->Sync(); @@ -236,7 +236,7 @@ class SpecialEnv : public EnvWrapper { LogFile(SpecialEnv* env, unique_ptr&& b) : env_(env), base_(std::move(b)) { } Status Append(const Slice& data) { - if (env_->log_write_error_.Acquire_Load() != nullptr) { + if (env_->log_write_error_.load(std::memory_order_acquire)) { return Status::IOError("simulated writer error"); } else { return base_->Append(data); @@ -250,7 +250,7 @@ class SpecialEnv : public EnvWrapper { } }; - if (non_writable_.Acquire_Load() != nullptr) { + if (non_writable_.load(std::memory_order_acquire)) { return Status::IOError("simulated write error"); } @@ -1211,7 +1211,8 @@ TEST(DBTest, Empty) { handles_[1], "rocksdb.num-entries-active-mem-table", &num)); ASSERT_EQ("1", num); - env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + // Block sync calls + env_->delay_sstable_sync_.store(true, std::memory_order_release); Put(1, "k1", std::string(100000, 'x')); // Fill memtable ASSERT_TRUE(dbfull()->GetProperty( handles_[1], "rocksdb.num-entries-active-mem-table", &num)); @@ -1223,7 +1224,8 @@ TEST(DBTest, Empty) { ASSERT_EQ("1", num); ASSERT_EQ("v1", Get(1, "foo")); - env_->delay_sstable_sync_.Release_Store(nullptr); // Release sync calls + // Release sync calls + env_->delay_sstable_sync_.store(false, std::memory_order_release); ASSERT_OK(db_->DisableFileDeletions()); ASSERT_TRUE( @@ -1539,12 +1541,14 @@ TEST(DBTest, GetFromImmutableLayer) { ASSERT_OK(Put(1, "foo", "v1")); ASSERT_EQ("v1", Get(1, "foo")); - env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + // Block sync calls + env_->delay_sstable_sync_.store(true, std::memory_order_release); Put(1, "k1", std::string(100000, 'x')); // Fill memtable Put(1, "k2", std::string(100000, 'y')); // Trigger flush ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("NOT_FOUND", Get(0, "foo")); - env_->delay_sstable_sync_.Release_Store(nullptr); // Release sync calls + // Release sync calls + env_->delay_sstable_sync_.store(false, std::memory_order_release); } while (ChangeOptions()); } @@ -5776,7 +5780,8 @@ TEST(DBTest, DropWrites) { ASSERT_EQ("v1", Get("foo")); Compact("a", "z"); const int num_files = CountFiles(); - env_->drop_writes_.Release_Store(env_); // Force out-of-space errors + // Force out-of-space errors + env_->drop_writes_.store(true, std::memory_order_release); env_->sleep_counter_.Reset(); for (int i = 0; i < 5; i++) { for (int level = 0; level < dbfull()->NumberLevels()-1; level++) { @@ -5788,7 +5793,7 @@ TEST(DBTest, DropWrites) { ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value)); ASSERT_EQ("5", property_value); - env_->drop_writes_.Release_Store(nullptr); + env_->drop_writes_.store(false, std::memory_order_release); ASSERT_LT(CountFiles(), num_files + 3); // Check that compaction attempts slept after errors @@ -5805,7 +5810,8 @@ TEST(DBTest, DropWritesFlush) { Reopen(&options); ASSERT_OK(Put("foo", "v1")); - env_->drop_writes_.Release_Store(env_); // Force out-of-space errors + // Force out-of-space errors + env_->drop_writes_.store(true, std::memory_order_release); std::string property_value; // Background error count is 0 now. @@ -5829,7 +5835,7 @@ TEST(DBTest, DropWritesFlush) { } ASSERT_EQ("1", property_value); - env_->drop_writes_.Release_Store(nullptr); + env_->drop_writes_.store(false, std::memory_order_release); } while (ChangeCompactOptions()); } @@ -5848,12 +5854,13 @@ TEST(DBTest, NoSpaceCompactRange) { ASSERT_OK(Flush()); } - env_->no_space_.Release_Store(env_); // Force out-of-space errors + // Force out-of-space errors + env_->no_space_.store(true, std::memory_order_release); Status s = db_->CompactRange(nullptr, nullptr); ASSERT_TRUE(s.IsIOError()); - env_->no_space_.Release_Store(nullptr); + env_->no_space_.store(false, std::memory_order_release); } while (ChangeCompactOptions()); } @@ -5864,7 +5871,8 @@ TEST(DBTest, NonWritableFileSystem) { options.env = env_; Reopen(&options); ASSERT_OK(Put("foo", "v1")); - env_->non_writable_.Release_Store(env_); // Force errors for new files + // Force errors for new files + env_->non_writable_.store(true, std::memory_order_release); std::string big(100000, 'x'); int errors = 0; for (int i = 0; i < 20; i++) { @@ -5874,7 +5882,7 @@ TEST(DBTest, NonWritableFileSystem) { } } ASSERT_GT(errors, 0); - env_->non_writable_.Release_Store(nullptr); + env_->non_writable_.store(false, std::memory_order_release); } while (ChangeCompactOptions()); } @@ -5888,7 +5896,7 @@ TEST(DBTest, ManifestWriteError) { // We iterate twice. In the second iteration, everything is the // same except the log record never makes it to the MANIFEST file. for (int iter = 0; iter < 2; iter++) { - port::AtomicPointer* error_type = (iter == 0) + std::atomic* error_type = (iter == 0) ? &env_->manifest_sync_error_ : &env_->manifest_write_error_; @@ -5909,12 +5917,12 @@ TEST(DBTest, ManifestWriteError) { ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level // Merging compaction (will fail) - error_type->Release_Store(env_); + error_type->store(true, std::memory_order_release); dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail ASSERT_EQ("bar", Get("foo")); // Recovery: should not lose data - error_type->Release_Store(nullptr); + error_type->store(false, std::memory_order_release); Reopen(&options); ASSERT_EQ("bar", Get("foo")); } @@ -5938,10 +5946,10 @@ TEST(DBTest, PutFailsParanoid) { ASSERT_OK(Put(1, "foo", "bar")); ASSERT_OK(Put(1, "foo1", "bar1")); // simulate error - env_->log_write_error_.Release_Store(env_); + env_->log_write_error_.store(true, std::memory_order_release); s = Put(1, "foo2", "bar2"); ASSERT_TRUE(!s.ok()); - env_->log_write_error_.Release_Store(nullptr); + env_->log_write_error_.store(false, std::memory_order_release); s = Put(1, "foo3", "bar3"); // the next put should fail, too ASSERT_TRUE(!s.ok()); @@ -5956,10 +5964,10 @@ TEST(DBTest, PutFailsParanoid) { ASSERT_OK(Put(1, "foo", "bar")); ASSERT_OK(Put(1, "foo1", "bar1")); // simulate error - env_->log_write_error_.Release_Store(env_); + env_->log_write_error_.store(true, std::memory_order_release); s = Put(1, "foo2", "bar2"); ASSERT_TRUE(!s.ok()); - env_->log_write_error_.Release_Store(nullptr); + env_->log_write_error_.store(false, std::memory_order_release); s = Put(1, "foo3", "bar3"); // the next put should NOT fail ASSERT_TRUE(s.ok()); @@ -6005,7 +6013,7 @@ TEST(DBTest, BloomFilter) { Flush(1); // Prevent auto compactions triggered by seeks - env_->delay_sstable_sync_.Release_Store(env_); + env_->delay_sstable_sync_.store(true, std::memory_order_release); // Lookup present keys. Should rarely read from small sstable. env_->random_read_counter_.Reset(); @@ -6026,7 +6034,7 @@ TEST(DBTest, BloomFilter) { fprintf(stderr, "%d missing => %d reads\n", N, reads); ASSERT_LE(reads, 3*N/100); - env_->delay_sstable_sync_.Release_Store(nullptr); + env_->delay_sstable_sync_.store(false, std::memory_order_release); Close(); } while (ChangeCompactOptions()); } @@ -7047,9 +7055,9 @@ static const int kNumKeys = 1000; struct MTState { DBTest* test; - port::AtomicPointer stop; - port::AtomicPointer counter[kNumThreads]; - port::AtomicPointer thread_done[kNumThreads]; + std::atomic stop; + std::atomic counter[kNumThreads]; + std::atomic thread_done[kNumThreads]; }; struct MTThread { @@ -7061,12 +7069,12 @@ static void MTThreadBody(void* arg) { MTThread* t = reinterpret_cast(arg); int id = t->id; DB* db = t->state->test->db_; - uintptr_t counter = 0; + int counter = 0; fprintf(stderr, "... starting thread %d\n", id); Random rnd(1000 + id); char valbuf[1500]; - while (t->state->stop.Acquire_Load() == nullptr) { - t->state->counter[id].Release_Store(reinterpret_cast(counter)); + while (t->state->stop.load(std::memory_order_acquire) == false) { + t->state->counter[id].store(counter, std::memory_order_release); int key = rnd.Uniform(kNumKeys); char keybuf[20]; @@ -7126,8 +7134,7 @@ static void MTThreadBody(void* arg) { ASSERT_EQ(k, key); ASSERT_GE(w, 0); ASSERT_LT(w, kNumThreads); - ASSERT_LE((unsigned int)c, reinterpret_cast( - t->state->counter[w].Acquire_Load())); + ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire)); ASSERT_EQ(cf, i); if (i == 0) { unique_id = u; @@ -7141,7 +7148,7 @@ static void MTThreadBody(void* arg) { } counter++; } - t->state->thread_done[id].Release_Store(t); + t->state->thread_done[id].store(true, std::memory_order_release); fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter)); } @@ -7157,10 +7164,10 @@ TEST(DBTest, MultiThreaded) { // Initialize state MTState mt; mt.test = this; - mt.stop.Release_Store(0); + mt.stop.store(false, std::memory_order_release); for (int id = 0; id < kNumThreads; id++) { - mt.counter[id].Release_Store(0); - mt.thread_done[id].Release_Store(0); + mt.counter[id].store(0, std::memory_order_release); + mt.thread_done[id].store(false, std::memory_order_release); } // Start threads @@ -7175,9 +7182,9 @@ TEST(DBTest, MultiThreaded) { env_->SleepForMicroseconds(kTestSeconds * 1000000); // Stop the threads and wait for them to finish - mt.stop.Release_Store(&mt); + mt.stop.store(true, std::memory_order_release); for (int id = 0; id < kNumThreads; id++) { - while (mt.thread_done[id].Acquire_Load() == nullptr) { + while (mt.thread_done[id].load(std::memory_order_acquire) == false) { env_->SleepForMicroseconds(100000); } } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index bd48f1f47..69325c748 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -114,7 +114,7 @@ void MemTableListVersion::Remove(MemTable* m) { bool MemTableList::IsFlushPending() const { if ((flush_requested_ && num_flush_not_started_ >= 1) || (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { - assert(imm_flush_needed.NoBarrier_Load() != nullptr); + assert(imm_flush_needed.load(std::memory_order_relaxed)); return true; } return false; @@ -129,7 +129,7 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { assert(!m->flush_completed_); num_flush_not_started_--; if (num_flush_not_started_ == 0) { - imm_flush_needed.Release_Store(nullptr); + imm_flush_needed.store(false, std::memory_order_release); } m->flush_in_progress_ = true; // flushing will start very soon ret->push_back(m); @@ -155,7 +155,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, num_flush_not_started_++; } pending_outputs->erase(file_number); - imm_flush_needed.Release_Store(reinterpret_cast(1)); + imm_flush_needed.store(true, std::memory_order_release); } // Record a successful flush in the manifest file @@ -236,7 +236,7 @@ Status MemTableList::InstallMemtableFlushResults( num_flush_not_started_++; pending_outputs->erase(m->file_number_); m->file_number_ = 0; - imm_flush_needed.Release_Store((void *)1); + imm_flush_needed.store(true, std::memory_order_release); } ++mem_id; } while (!current_->memlist_.empty() && (m = current_->memlist_.back()) && @@ -259,7 +259,7 @@ void MemTableList::Add(MemTable* m) { m->MarkImmutable(); num_flush_not_started_++; if (num_flush_not_started_ == 1) { - imm_flush_needed.Release_Store((void *)1); + imm_flush_needed.store(true, std::memory_order_release); } } diff --git a/db/memtable_list.h b/db/memtable_list.h index d93c7df92..5e16be5cb 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -78,12 +78,12 @@ class MemTableList { public: // A list of memtables. explicit MemTableList(int min_write_buffer_number_to_merge) - : min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), + : imm_flush_needed(false), + min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), current_(new MemTableListVersion()), num_flush_not_started_(0), commit_in_progress_(false), flush_requested_(false) { - imm_flush_needed.Release_Store(nullptr); current_->Ref(); } ~MemTableList() {} @@ -92,7 +92,7 @@ class MemTableList { // so that background threads can detect non-nullptr pointer to // determine whether there is anything more to start flushing. - port::AtomicPointer imm_flush_needed; + std::atomic imm_flush_needed; // Returns the total number of memtables in the list int size() const; diff --git a/db/skiplist.h b/db/skiplist.h index 751f7c3ec..68c8bceba 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -115,15 +115,14 @@ class SkipList { // Modified only by Insert(). Read racily by readers, but stale // values are ok. - port::AtomicPointer max_height_; // Height of the entire list + std::atomic max_height_; // Height of the entire list // Used for optimizing sequential insert patterns Node** prev_; int32_t prev_height_; inline int GetMaxHeight() const { - return static_cast( - reinterpret_cast(max_height_.NoBarrier_Load())); + return max_height_.load(std::memory_order_relaxed); } // Read/written only by Insert(). @@ -169,35 +168,35 @@ struct SkipList::Node { assert(n >= 0); // Use an 'acquire load' so that we observe a fully initialized // version of the returned Node. - return reinterpret_cast(next_[n].Acquire_Load()); + return (next_[n].load(std::memory_order_acquire)); } void SetNext(int n, Node* x) { assert(n >= 0); // Use a 'release store' so that anybody who reads through this // pointer observes a fully initialized version of the inserted node. - next_[n].Release_Store(x); + next_[n].store(x, std::memory_order_release); } // No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next(int n) { assert(n >= 0); - return reinterpret_cast(next_[n].NoBarrier_Load()); + return next_[n].load(std::memory_order_relaxed); } void NoBarrier_SetNext(int n, Node* x) { assert(n >= 0); - next_[n].NoBarrier_Store(x); + next_[n].store(x, std::memory_order_relaxed); } private: // Array of length equal to the node height. next_[0] is lowest level link. - port::AtomicPointer next_[1]; + std::atomic next_[1]; }; template typename SkipList::Node* SkipList::NewNode(const Key& key, int height) { char* mem = arena_->AllocateAligned( - sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); + sizeof(Node) + sizeof(std::atomic) * (height - 1)); return new (mem) Node(key); } @@ -364,7 +363,7 @@ SkipList::SkipList(const Comparator cmp, Arena* arena, compare_(cmp), arena_(arena), head_(NewNode(0 /* any key will do */, max_height)), - max_height_(reinterpret_cast(1)), + max_height_(1), prev_height_(1), rnd_(0xdeadbeef) { assert(kMaxHeight_ > 0); @@ -402,7 +401,7 @@ void SkipList::Insert(const Key& key) { // the loop below. In the former case the reader will // immediately drop to the next level since nullptr sorts after all // keys. In the latter case the reader will use the new node. - max_height_.NoBarrier_Store(reinterpret_cast(height)); + max_height_.store(height, std::memory_order_relaxed); } x = NewNode(key, height); diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc index b87ddcbb0..48323b244 100644 --- a/db/skiplist_test.cc +++ b/db/skiplist_test.cc @@ -191,13 +191,11 @@ class ConcurrentTest { // Per-key generation struct State { - port::AtomicPointer generation[K]; - void Set(int k, intptr_t v) { - generation[k].Release_Store(reinterpret_cast(v)); - } - intptr_t Get(int k) { - return reinterpret_cast(generation[k].Acquire_Load()); + std::atomic generation[K]; + void Set(int k, int v) { + generation[k].store(v, std::memory_order_release); } + int Get(int k) { return generation[k].load(std::memory_order_acquire); } State() { for (unsigned int k = 0; k < K; k++) { @@ -221,7 +219,7 @@ class ConcurrentTest { // REQUIRES: External synchronization void WriteStep(Random* rnd) { const uint32_t k = rnd->Next() % K; - const intptr_t g = current_.Get(k) + 1; + const int g = current_.Get(k) + 1; const Key key = MakeKey(k, g); list_.Insert(key); current_.Set(k, g); @@ -303,7 +301,7 @@ class TestState { public: ConcurrentTest t_; int seed_; - port::AtomicPointer quit_flag_; + std::atomic quit_flag_; enum ReaderState { STARTING, @@ -312,10 +310,7 @@ class TestState { }; explicit TestState(int s) - : seed_(s), - quit_flag_(nullptr), - state_(STARTING), - state_cv_(&mu_) {} + : seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {} void Wait(ReaderState s) { mu_.Lock(); @@ -343,7 +338,7 @@ static void ConcurrentReader(void* arg) { Random rnd(state->seed_); int64_t reads = 0; state->Change(TestState::RUNNING); - while (!state->quit_flag_.Acquire_Load()) { + while (!state->quit_flag_.load(std::memory_order_acquire)) { state->t_.ReadStep(&rnd); ++reads; } @@ -365,7 +360,7 @@ static void RunConcurrent(int run) { for (int i = 0; i < kSize; i++) { state.t_.WriteStep(&rnd); } - state.quit_flag_.Release_Store(&state); // Any non-nullptr arg will do + state.quit_flag_.store(true, std::memory_order_release); state.Wait(TestState::DONE); } } diff --git a/port/atomic_pointer.h b/port/atomic_pointer.h deleted file mode 100644 index db3580bde..000000000 --- a/port/atomic_pointer.h +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -// AtomicPointer provides storage for a lock-free pointer. -// Platform-dependent implementation of AtomicPointer: -// - If the platform provides a cheap barrier, we use it with raw pointers -// - If cstdatomic is present (on newer versions of gcc, it is), we use -// a cstdatomic-based AtomicPointer. However we prefer the memory -// barrier based version, because at least on a gcc 4.4 32-bit build -// on linux, we have encountered a buggy -// implementation. Also, some implementations are much -// slower than a memory-barrier based implementation (~16ns for -// based acquire-load vs. ~1ns for a barrier based -// acquire-load). -// This code is based on atomicops-internals-* in Google's perftools: -// http://code.google.com/p/google-perftools/source/browse/#svn%2Ftrunk%2Fsrc%2Fbase - -#ifndef PORT_ATOMIC_POINTER_H_ -#define PORT_ATOMIC_POINTER_H_ - -#include -#ifdef ROCKSDB_ATOMIC_PRESENT -#include -#endif -#ifdef OS_WIN -#include -#endif -#ifdef OS_MACOSX -#include -#endif - -#if defined(_M_X64) || defined(__x86_64__) -#define ARCH_CPU_X86_FAMILY 1 -#elif defined(_M_IX86) || defined(__i386__) || defined(__i386) -#define ARCH_CPU_X86_FAMILY 1 -#elif defined(__ARMEL__) -#define ARCH_CPU_ARM_FAMILY 1 -#endif - -namespace rocksdb { -namespace port { - -// Define MemoryBarrier() if available -// Windows on x86 -#if defined(OS_WIN) && defined(COMPILER_MSVC) && defined(ARCH_CPU_X86_FAMILY) -// windows.h already provides a MemoryBarrier(void) macro -// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx -#define ROCKSDB_HAVE_MEMORY_BARRIER - -// Gcc on x86 -#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__) -inline void MemoryBarrier() { - // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on - // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. - __asm__ __volatile__("" : : : "memory"); -} -#define ROCKSDB_HAVE_MEMORY_BARRIER - -// Sun Studio -#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC) -inline void MemoryBarrier() { - // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on - // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. - asm volatile("" : : : "memory"); -} -#define ROCKSDB_HAVE_MEMORY_BARRIER - -// Mac OS -#elif defined(OS_MACOSX) -inline void MemoryBarrier() { - OSMemoryBarrier(); -} -#define ROCKSDB_HAVE_MEMORY_BARRIER - -// ARM Linux -#elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__) -typedef void (*LinuxKernelMemoryBarrierFunc)(void); -// The Linux ARM kernel provides a highly optimized device-specific memory -// barrier function at a fixed memory address that is mapped in every -// user-level process. -// -// This beats using CPU-specific instructions which are, on single-core -// devices, un-necessary and very costly (e.g. ARMv7-A "dmb" takes more -// than 180ns on a Cortex-A8 like the one on a Nexus One). Benchmarking -// shows that the extra function call cost is completely negligible on -// multi-core devices. -// -inline void MemoryBarrier() { - (*(LinuxKernelMemoryBarrierFunc)0xffff0fa0)(); -} -#define ROCKSDB_HAVE_MEMORY_BARRIER - -#endif - -// AtomicPointer built using platform-specific MemoryBarrier() -#if defined(ROCKSDB_HAVE_MEMORY_BARRIER) -class AtomicPointer { - private: - void* rep_; - public: - AtomicPointer() { } - explicit AtomicPointer(void* p) : rep_(p) {} - inline void* NoBarrier_Load() const { return rep_; } - inline void NoBarrier_Store(void* v) { rep_ = v; } - inline void* Acquire_Load() const { - void* result = rep_; - MemoryBarrier(); - return result; - } - inline void Release_Store(void* v) { - MemoryBarrier(); - rep_ = v; - } -}; - -// AtomicPointer based on -#elif defined(ROCKSDB_ATOMIC_PRESENT) -class AtomicPointer { - private: - std::atomic rep_; - public: - AtomicPointer() { } - explicit AtomicPointer(void* v) : rep_(v) { } - inline void* Acquire_Load() const { - return rep_.load(std::memory_order_acquire); - } - inline void Release_Store(void* v) { - rep_.store(v, std::memory_order_release); - } - inline void* NoBarrier_Load() const { - return rep_.load(std::memory_order_relaxed); - } - inline void NoBarrier_Store(void* v) { - rep_.store(v, std::memory_order_relaxed); - } -}; - -// We have neither MemoryBarrier(), nor -#else -#error Please implement AtomicPointer for this platform. - -#endif - -#undef ROCKSDB_HAVE_MEMORY_BARRIER -#undef ARCH_CPU_X86_FAMILY -#undef ARCH_CPU_ARM_FAMILY - -} // namespace port -} // namespace rocksdb - -#endif // PORT_ATOMIC_POINTER_H_ diff --git a/port/port_example.h b/port/port_example.h index f124abb06..ba14618fa 100644 --- a/port/port_example.h +++ b/port/port_example.h @@ -75,35 +75,6 @@ typedef intptr_t OnceType; #define LEVELDB_ONCE_INIT 0 extern void InitOnce(port::OnceType*, void (*initializer)()); -// A type that holds a pointer that can be read or written atomically -// (i.e., without word-tearing.) -class AtomicPointer { - private: - intptr_t rep_; - public: - // Initialize to arbitrary value - AtomicPointer(); - - // Initialize to hold v - explicit AtomicPointer(void* v) : rep_(v) { } - - // Read and return the stored pointer with the guarantee that no - // later memory access (read or write) by this thread can be - // reordered ahead of this read. - void* Acquire_Load() const; - - // Set v as the stored pointer with the guarantee that no earlier - // memory access (read or write) by this thread can be reordered - // after this store. - void Release_Store(void* v); - - // Read the stored pointer with no ordering guarantees. - void* NoBarrier_Load() const; - - // Set va as the stored pointer with no ordering guarantees. - void NoBarrier_Store(void* v); -}; - // ------------------ Compression ------------------- // Store the snappy compression of "input[0,input_length-1]" in *output. diff --git a/port/port_posix.h b/port/port_posix.h index 2e3c868b3..dae8f7219 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -55,7 +55,6 @@ #include #include #include "rocksdb/options.h" -#include "port/atomic_pointer.h" #ifndef PLATFORM_IS_LITTLE_ENDIAN #define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN) diff --git a/tools/db_repl_stress.cc b/tools/db_repl_stress.cc index 5970bb684..fbe426573 100644 --- a/tools/db_repl_stress.cc +++ b/tools/db_repl_stress.cc @@ -58,7 +58,7 @@ static void DataPumpThreadBody(void* arg) { } struct ReplicationThread { - port::AtomicPointer stop; + std::atomic stop; DB* db; volatile size_t no_read; }; @@ -68,11 +68,11 @@ static void ReplicationThreadBody(void* arg) { DB* db = t->db; unique_ptr iter; SequenceNumber currentSeqNum = 1; - while (t->stop.Acquire_Load() != nullptr) { + while (!t->stop.load(std::memory_order_acquire)) { iter.reset(); Status s; while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) { - if (t->stop.Acquire_Load() == nullptr) { + if (t->stop.load(std::memory_order_acquire)) { return; } } @@ -129,11 +129,11 @@ int main(int argc, const char** argv) { ReplicationThread replThread; replThread.db = db; replThread.no_read = 0; - replThread.stop.Release_Store(env); // store something to make it non-null. + replThread.stop.store(false, std::memory_order_release); env->StartThread(ReplicationThreadBody, &replThread); while(replThread.no_read < FLAGS_num_inserts); - replThread.stop.Release_Store(nullptr); + replThread.stop.store(true, std::memory_order_release); if (replThread.no_read < dataPump.no_records) { // no. read should be => than inserted. fprintf(stderr, "No. of Record's written and read not same\nRead : %zu" diff --git a/util/env_test.cc b/util/env_test.cc index 1779f1aa0..be542e9af 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -44,30 +44,31 @@ class EnvPosixTest { }; static void SetBool(void* ptr) { - reinterpret_cast(ptr)->NoBarrier_Store(ptr); + reinterpret_cast*>(ptr) + ->store(true, std::memory_order_relaxed); } TEST(EnvPosixTest, RunImmediately) { - port::AtomicPointer called (nullptr); + std::atomic called(false); env_->Schedule(&SetBool, &called); Env::Default()->SleepForMicroseconds(kDelayMicros); - ASSERT_TRUE(called.NoBarrier_Load() != nullptr); + ASSERT_TRUE(called.load(std::memory_order_relaxed)); } TEST(EnvPosixTest, RunMany) { - port::AtomicPointer last_id (nullptr); + std::atomic last_id(0); struct CB { - port::AtomicPointer* last_id_ptr; // Pointer to shared slot - uintptr_t id; // Order# for the execution of this callback + std::atomic* last_id_ptr; // Pointer to shared slot + int id; // Order# for the execution of this callback - CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { } + CB(std::atomic* p, int i) : last_id_ptr(p), id(i) {} static void Run(void* v) { CB* cb = reinterpret_cast(v); - void* cur = cb->last_id_ptr->NoBarrier_Load(); - ASSERT_EQ(cb->id-1, reinterpret_cast(cur)); - cb->last_id_ptr->Release_Store(reinterpret_cast(cb->id)); + int cur = cb->last_id_ptr->load(std::memory_order_relaxed); + ASSERT_EQ(cb->id - 1, cur); + cb->last_id_ptr->store(cb->id, std::memory_order_release); } }; @@ -82,8 +83,8 @@ TEST(EnvPosixTest, RunMany) { env_->Schedule(&CB::Run, &cb4); Env::Default()->SleepForMicroseconds(kDelayMicros); - void* cur = last_id.Acquire_Load(); - ASSERT_EQ(4U, reinterpret_cast(cur)); + int cur = last_id.load(std::memory_order_acquire); + ASSERT_EQ(4, cur); } struct State { diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 8e3dc5826..8e5f4025d 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -8,12 +8,12 @@ #include "util/hash_linklist_rep.h" #include +#include #include "rocksdb/memtablerep.h" #include "util/arena.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "port/port.h" -#include "port/atomic_pointer.h" #include "util/histogram.h" #include "util/murmurhash.h" #include "db/memtable.h" @@ -24,7 +24,7 @@ namespace { typedef const char* Key; typedef SkipList MemtableSkipList; -typedef port::AtomicPointer Pointer; +typedef std::atomic Pointer; // A data structure used as the header of a link list of a hash bucket. struct BucketHeader { @@ -34,7 +34,9 @@ struct BucketHeader { explicit BucketHeader(void* n, uint32_t count) : next(n), num_entries(count) {} - bool IsSkipListBucket() { return next.NoBarrier_Load() == this; } + bool IsSkipListBucket() { + return next.load(std::memory_order_relaxed) == this; + } }; // A data structure used as the header of a skip list of a hash bucket. @@ -55,24 +57,23 @@ struct Node { Node* Next() { // Use an 'acquire load' so that we observe a fully initialized // version of the returned Node. - return reinterpret_cast(next_.Acquire_Load()); + return next_.load(std::memory_order_acquire); } void SetNext(Node* x) { // Use a 'release store' so that anybody who reads through this // pointer observes a fully initialized version of the inserted node. - next_.Release_Store(x); + next_.store(x, std::memory_order_release); } // No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next() { - return reinterpret_cast(next_.NoBarrier_Load()); + return next_.load(std::memory_order_relaxed); } - void NoBarrier_SetNext(Node* x) { - next_.NoBarrier_Store(x); - } + void NoBarrier_SetNext(Node* x) { next_.store(x, std::memory_order_relaxed); } private: - port::AtomicPointer next_; + std::atomic next_; + public: char key[0]; }; @@ -174,7 +175,7 @@ class HashLinkListRep : public MemTableRep { // Maps slices (which are transformed user keys) to buckets of keys sharing // the same transform. - port::AtomicPointer* buckets_; + Pointer* buckets_; const uint32_t threshold_use_skiplist_; @@ -203,7 +204,7 @@ class HashLinkListRep : public MemTableRep { } Pointer* GetBucket(size_t i) const { - return static_cast(buckets_[i].Acquire_Load()); + return static_cast(buckets_[i].load(std::memory_order_acquire)); } Pointer* GetBucket(const Slice& slice) const { @@ -467,13 +468,13 @@ HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, logger_(logger), bucket_entries_logging_threshold_(bucket_entries_logging_threshold), if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) { - char* mem = arena_->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size, + char* mem = arena_->AllocateAligned(sizeof(Pointer) * bucket_size, huge_page_tlb_size, logger); - buckets_ = new (mem) port::AtomicPointer[bucket_size]; + buckets_ = new (mem) Pointer[bucket_size]; for (size_t i = 0; i < bucket_size_; ++i) { - buckets_[i].NoBarrier_Store(nullptr); + buckets_[i].store(nullptr, std::memory_order_relaxed); } } @@ -492,7 +493,7 @@ SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader( if (first_next_pointer == nullptr) { return nullptr; } - if (first_next_pointer->NoBarrier_Load() == nullptr) { + if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) { // Single entry bucket return nullptr; } @@ -502,8 +503,8 @@ SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader( assert(header->num_entries > threshold_use_skiplist_); auto* skip_list_bucket_header = reinterpret_cast(header); - assert(skip_list_bucket_header->Counting_header.next.NoBarrier_Load() == - header); + assert(skip_list_bucket_header->Counting_header.next.load( + std::memory_order_relaxed) == header); return skip_list_bucket_header; } assert(header->num_entries <= threshold_use_skiplist_); @@ -514,7 +515,7 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const { if (first_next_pointer == nullptr) { return nullptr; } - if (first_next_pointer->NoBarrier_Load() == nullptr) { + if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) { // Single entry bucket return reinterpret_cast(first_next_pointer); } @@ -522,7 +523,8 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const { BucketHeader* header = reinterpret_cast(first_next_pointer); if (!header->IsSkipListBucket()) { assert(header->num_entries <= threshold_use_skiplist_); - return reinterpret_cast(header->next.NoBarrier_Load()); + return reinterpret_cast( + header->next.load(std::memory_order_relaxed)); } assert(header->num_entries > threshold_use_skiplist_); return nullptr; @@ -534,19 +536,20 @@ void HashLinkListRep::Insert(KeyHandle handle) { Slice internal_key = GetLengthPrefixedSlice(x->key); auto transformed = GetPrefix(internal_key); auto& bucket = buckets_[GetHash(transformed)]; - Pointer* first_next_pointer = static_cast(bucket.NoBarrier_Load()); + Pointer* first_next_pointer = + static_cast(bucket.load(std::memory_order_relaxed)); if (first_next_pointer == nullptr) { // Case 1. empty bucket // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(nullptr); - bucket.Release_Store(x); + bucket.store(x, std::memory_order_release); return; } BucketHeader* header = nullptr; - if (first_next_pointer->NoBarrier_Load() == nullptr) { + if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) { // Case 2. only one entry in the bucket // Need to convert to a Counting bucket and turn to case 4. Node* first = reinterpret_cast(first_next_pointer); @@ -557,7 +560,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { // think the node is a bucket header. auto* mem = arena_->AllocateAligned(sizeof(BucketHeader)); header = new (mem) BucketHeader(first, 1); - bucket.Release_Store(header); + bucket.store(header, std::memory_order_release); } else { header = reinterpret_cast(first_next_pointer); if (header->IsSkipListBucket()) { @@ -585,7 +588,8 @@ void HashLinkListRep::Insert(KeyHandle handle) { // Case 3. number of entries reaches the threshold so need to convert to // skip list. LinkListIterator bucket_iter( - this, reinterpret_cast(first_next_pointer->NoBarrier_Load())); + this, reinterpret_cast( + first_next_pointer->load(std::memory_order_relaxed))); auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader)); SkipListBucketHeader* new_skip_list_header = new (mem) SkipListBucketHeader(compare_, arena_, header->num_entries + 1); @@ -599,11 +603,12 @@ void HashLinkListRep::Insert(KeyHandle handle) { // insert the new entry skip_list.Insert(x->key); // Set the bucket - bucket.Release_Store(new_skip_list_header); + bucket.store(new_skip_list_header, std::memory_order_release); } else { // Case 5. Need to insert to the sorted linked list without changing the // header. - Node* first = reinterpret_cast(header->next.NoBarrier_Load()); + Node* first = + reinterpret_cast(header->next.load(std::memory_order_relaxed)); assert(first != nullptr); // Advance counter unless the bucket needs to be advanced to skip list. // In that case, we need to make sure the previous count never exceeds @@ -640,7 +645,7 @@ void HashLinkListRep::Insert(KeyHandle handle) { if (prev) { prev->SetNext(x); } else { - header->next.Release_Store(static_cast(x)); + header->next.store(static_cast(x), std::memory_order_release); } } } diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 1c7a459bd..f410350e7 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -7,12 +7,13 @@ #ifndef ROCKSDB_LITE #include "util/hash_skiplist_rep.h" +#include + #include "rocksdb/memtablerep.h" #include "util/arena.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "port/port.h" -#include "port/atomic_pointer.h" #include "util/murmurhash.h" #include "db/memtable.h" #include "db/skiplist.h" @@ -54,7 +55,7 @@ class HashSkipListRep : public MemTableRep { // Maps slices (which are transformed user keys) to buckets of keys sharing // the same transform. - port::AtomicPointer* buckets_; + std::atomic* buckets_; // The user-supplied transform whose domain is the user keys. const SliceTransform* transform_; @@ -67,7 +68,7 @@ class HashSkipListRep : public MemTableRep { return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_; } inline Bucket* GetBucket(size_t i) const { - return static_cast(buckets_[i].Acquire_Load()); + return buckets_[i].load(std::memory_order_acquire); } inline Bucket* GetBucket(const Slice& slice) const { return GetBucket(GetHash(slice)); @@ -229,12 +230,11 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, transform_(transform), compare_(compare), arena_(arena) { - auto mem = - arena->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size); - buckets_ = new (mem) port::AtomicPointer[bucket_size]; + auto mem = arena->AllocateAligned(sizeof(std::atomic) * bucket_size); + buckets_ = new (mem) std::atomic[bucket_size]; for (size_t i = 0; i < bucket_size_; ++i) { - buckets_[i].NoBarrier_Store(nullptr); + buckets_[i].store(nullptr, std::memory_order_relaxed); } } @@ -249,7 +249,7 @@ HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( auto addr = arena_->AllocateAligned(sizeof(Bucket)); bucket = new (addr) Bucket(compare_, arena_, skiplist_height_, skiplist_branching_factor_); - buckets_[hash].Release_Store(static_cast(bucket)); + buckets_[hash].store(bucket, std::memory_order_release); } return bucket; }