From 48bc0c6ad39b69ba118435e1bcfc69b23606cb10 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Sun, 23 Mar 2014 21:49:14 -0700 Subject: [PATCH 1/5] [RocksDB] Fix a race condition in GetSortedWalFiles Summary: This patch fixed a race condition where a log file is moved to archived dir in the middle of GetSortedWalFiles. Without the fix, the log file would be missed in the result, which leads to transaction log iterator gap. A test utility SyncPoint is added to help reproducing the race condition. Test Plan: TransactionLogIteratorRace; make check Reviewers: dhruba, ljin Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D17121 --- db/db_filesnapshot.cc | 48 ++++++++++++++++++++++---- db/db_impl.cc | 19 +++++------ db/db_impl.h | 6 ++-- db/db_test.cc | 46 +++++++++++++++++++++++++ util/sync_point.cc | 62 +++++++++++++++++++++++++++++++++ util/sync_point.h | 79 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 241 insertions(+), 19 deletions(-) create mode 100644 util/sync_point.cc create mode 100644 util/sync_point.h diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 04d6d0e17..89db22f43 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -17,6 +17,7 @@ #include "rocksdb/env.h" #include "port/port.h" #include "util/mutexlock.h" +#include "util/sync_point.h" namespace rocksdb { @@ -95,20 +96,55 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { - // First get sorted files in archive dir, then append sorted files from main - // dir to maintain sorted order + // First get sorted files in db dir, then get sorted files from archived + // dir, to avoid a race condition where a log file is moved to archived + // dir in between. + Status s; + // list wal files in main db dir. + VectorLogPtr logs; + s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile); + if (!s.ok()) { + return s; + } + // Reproduce the race condition where a log file is moved + // to archived dir, between these two sync points, used in + // (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1"); + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2"); + + files.clear(); // list wal files in archive dir. - Status s; std::string archivedir = ArchivalDirectory(options_.wal_dir); if (env_->FileExists(archivedir)) { - s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); + s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); if (!s.ok()) { return s; } } - // list wal files in main db dir. - return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile); + + uint64_t latest_archived_log_number = 0; + if (!files.empty()) { + latest_archived_log_number = files.back()->LogNumber(); + Log(options_.info_log, "Latest Archived log: %lu", + latest_archived_log_number); + } + + files.reserve(files.size() + logs.size()); + for (auto& log : logs) { + if (log->LogNumber() > latest_archived_log_number) { + files.push_back(std::move(log)); + } else { + // When the race condition happens, we could see the + // same log in both db dir and archived dir. Simply + // ignore the one in db dir. Note that, if we read + // archived dir first, we would have missed the log file. + Log(options_.info_log, "%s already moved to archive", + log->PathName().c_str()); + } + } + + return s; } } diff --git a/db/db_impl.cc b/db/db_impl.cc index bb1f839a9..b813efc49 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -64,6 +64,7 @@ #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -872,7 +873,11 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1"); Status s = env_->RenameFile(fname, archived_log_name); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2"); Log(options_.info_log, "Move log file %s to %s -- %s\n", fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); @@ -1020,7 +1025,7 @@ void DBImpl::PurgeObsoleteWALFiles() { size_t files_del_num = log_files_num - files_keep_num; VectorLogPtr archived_logs; - AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); + GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); if (files_del_num > archived_logs.size()) { Log(options_.info_log, "Trying to delete more archived log files than " @@ -1791,20 +1796,14 @@ struct CompareLogByPointer { } }; -Status DBImpl::AppendSortedWalsOfType(const std::string& path, +Status DBImpl::GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files, WalFileType log_type) { std::vector all_files; const Status status = env_->GetChildren(path, &all_files); if (!status.ok()) { return status; } - log_files.reserve(log_files.size() + all_files.size()); - VectorLogPtr::iterator pos_start; - if (!log_files.empty()) { - pos_start = log_files.end() - 1; - } else { - pos_start = log_files.begin(); - } + log_files.reserve(all_files.size()); for (const auto& f : all_files) { uint64_t number; FileType type; @@ -1830,7 +1829,7 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, } } CompareLogByPointer compare_log_files; - std::sort(pos_start, log_files.end(), compare_log_files); + std::sort(log_files.begin(), log_files.end(), compare_log_files); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 4cfb6ecaf..3eb557a02 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -394,9 +394,9 @@ class DBImpl : public DB { void PurgeObsoleteWALFiles(); - Status AppendSortedWalsOfType(const std::string& path, - VectorLogPtr& log_files, - WalFileType type); + Status GetSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType type); // Requires: all_logs should be sorted with earliest log file first // Retains all log files in all_logs which contain updates with seq no. diff --git a/db/db_test.cc b/db/db_test.cc index 0695b5cc7..f707eb97c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -37,6 +37,7 @@ #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" +#include "util/sync_point.h" #include "util/testutil.h" namespace rocksdb { @@ -5189,6 +5190,51 @@ TEST(DBTest, TransactionLogIterator) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorRace) { + // Setup sync point dependency to reproduce the race condition of + // a log file moved to archived dir, in the middle of GetSortedWalFiles + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" }, + { "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" }, + }); + + do { + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key2", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key3", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key4", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(4, iter); + } + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // trigger async flush, and log move. Well, log move will + // wait until the GetSortedWalFiles:1 to reproduce the race + // condition + FlushOptions flush_options; + flush_options.wait = false; + dbfull()->Flush(flush_options); + + // "key5" would be written in a new memtable and log + Put("key5", DummyString(1024)); + { + // this iter would miss "key4" if not fixed + auto iter = OpenTransactionLogIter(0); + ExpectRecords(5, iter); + } + } while (ChangeCompactOptions()); +} + TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { do { Options options = OptionsForLogIterTest(); diff --git a/util/sync_point.cc b/util/sync_point.cc new file mode 100644 index 000000000..5d0ac2dd6 --- /dev/null +++ b/util/sync_point.cc @@ -0,0 +1,62 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/sync_point.h" + +namespace rocksdb { + +SyncPoint* SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +void SyncPoint::LoadDependency(const std::vector& dependencies) { + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } +} + +bool SyncPoint::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::EnableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = true; +} + +void SyncPoint::DisableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = false; +} + +void SyncPoint::ClearTrace() { + std::unique_lock lock(mutex_); + cleared_points_.clear(); +} + +void SyncPoint::Process(const std::string& point) { + std::unique_lock lock(mutex_); + + if (!enabled_) return; + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + } + + cleared_points_.insert(point); + cv_.notify_all(); +} + +} // namespace rocksdb diff --git a/util/sync_point.h b/util/sync_point.h new file mode 100644 index 000000000..3cc892370 --- /dev/null +++ b/util/sync_point.h @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an exmaple use case. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + struct Dependency { + std::string predecessor; + std::string successor; + }; + // call once at the beginning of a test to setup the dependency between + // sync points + void LoadDependency(const std::vector& dependencies); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + void Process(const std::string& point); + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + private: + bool PredecessorsAllCleared(const std::string& point); + + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + bool enabled_ = false; +}; + +} // namespace rocksdb + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after depedency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#else +#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#endif From 3a30b5b0be5e3ba395c098b55e0532855b93af2e Mon Sep 17 00:00:00 2001 From: Thomas Adam Date: Thu, 3 Apr 2014 08:59:01 +0200 Subject: [PATCH 2/5] [C-API] added "rocksdb_options_set_plain_table_factory" to make it possible to use plain table factory --- db/c.cc | 13 +++++++++++++ db/c_test.c | 1 + include/rocksdb/c.h | 1 + 3 files changed, 15 insertions(+) diff --git a/db/c.cc b/db/c.cc index 2e55c0ea1..2b3e5e538 100644 --- a/db/c.cc +++ b/db/c.cc @@ -25,6 +25,7 @@ #include "rocksdb/universal_compaction.h" #include "rocksdb/statistics.h" #include "rocksdb/slice_transform.h" +#include "rocksdb/table.h" using rocksdb::Cache; using rocksdb::Comparator; @@ -1003,6 +1004,18 @@ void rocksdb_options_set_hash_link_list_rep( opt->rep.memtable_factory.reset(factory); } +void rocksdb_options_set_plain_table_factory( + rocksdb_options_t *opt, uint32_t user_key_len, int bloom_bits_per_key, + double hash_table_ratio, size_t index_sparseness) { + static rocksdb::TableFactory* factory = 0; + if (!factory) { + factory = rocksdb::NewPlainTableFactory( + user_key_len, bloom_bits_per_key, + hash_table_ratio, index_sparseness); + } + opt->rep.table_factory.reset(factory); +} + void rocksdb_options_set_max_successive_merges( rocksdb_options_t* opt, size_t v) { opt->rep.max_successive_merges = v; diff --git a/db/c_test.c b/db/c_test.c index 4a7957b14..e6c5a9e67 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -443,6 +443,7 @@ int main(int argc, char** argv) { rocksdb_options_set_filter_policy(options, policy); rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4); + rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16); db = rocksdb_open(options, dbname, &err); CheckNoError(err); diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index a6bc90085..74ed6a340 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -348,6 +348,7 @@ extern void rocksdb_options_prepare_for_bulk_load(rocksdb_options_t*); extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*); extern void rocksdb_options_set_hash_skip_list_rep(rocksdb_options_t*, size_t, int32_t, int32_t); extern void rocksdb_options_set_hash_link_list_rep(rocksdb_options_t*, size_t); +extern void rocksdb_options_set_plain_table_factory(rocksdb_options_t*, uint32_t, int, double, size_t); extern void rocksdb_options_set_max_bytes_for_level_base(rocksdb_options_t* opt, uint64_t n); extern void rocksdb_options_set_stats_dump_period_sec(rocksdb_options_t* opt, unsigned int sec); From 98422cba77b0903794cffbe2b9742a4b7927c768 Mon Sep 17 00:00:00 2001 From: Thomas Adam Date: Thu, 3 Apr 2014 10:47:07 +0200 Subject: [PATCH 3/5] [C-API] implemented more options --- db/c.cc | 69 ++++++++++++++++++++++++++++++++++++++++----- include/rocksdb/c.h | 17 +++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/db/c.cc b/db/c.cc index 2b3e5e538..b566daf64 100644 --- a/db/c.cc +++ b/db/c.cc @@ -32,6 +32,7 @@ using rocksdb::Comparator; using rocksdb::CompressionType; using rocksdb::DB; using rocksdb::Env; +using rocksdb::InfoLogLevel; using rocksdb::FileLock; using rocksdb::FilterPolicy; using rocksdb::FlushOptions; @@ -657,6 +658,11 @@ void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) { } } +void rocksdb_options_set_info_log_level( + rocksdb_options_t* opt, int v) { + opt->rep.info_log_level = static_cast(v); +} + void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) { opt->rep.write_buffer_size = s; } @@ -715,6 +721,14 @@ void rocksdb_options_set_max_grandparent_overlap_factor( opt->rep.max_grandparent_overlap_factor = n; } +void rocksdb_options_set_max_bytes_for_level_multiplier_additional( + rocksdb_options_t* opt, int* level_values, size_t num_levels) { + opt->rep.max_bytes_for_level_multiplier_additional.resize(num_levels); + for (size_t i = 0; i < num_levels; ++i) { + opt->rep.max_bytes_for_level_multiplier_additional[i] = level_values[i]; + } +} + void rocksdb_options_enable_statistics(rocksdb_options_t* opt) { opt->rep.statistics = rocksdb::CreateDBStatistics(); } @@ -858,6 +872,24 @@ void rocksdb_options_set_advise_random_on_open( opt->rep.advise_random_on_open = v; } +void rocksdb_options_set_access_hint_on_compaction_start( + rocksdb_options_t* opt, int v) { + switch(v) { + case 0: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::NONE; + break; + case 1: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::NORMAL; + break; + case 2: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::SEQUENTIAL; + break; + case 3: + opt->rep.access_hint_on_compaction_start = rocksdb::Options::WILLNEED; + break; + } +} + void rocksdb_options_set_use_adaptive_mutex( rocksdb_options_t* opt, unsigned char v) { opt->rep.use_adaptive_mutex = v; @@ -868,6 +900,11 @@ void rocksdb_options_set_bytes_per_sync( opt->rep.bytes_per_sync = v; } +void rocksdb_options_set_verify_checksums_in_compaction( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.verify_checksums_in_compaction = v; +} + void rocksdb_options_set_filter_deletes( rocksdb_options_t* opt, unsigned char v) { opt->rep.filter_deletes = v; @@ -1021,6 +1058,31 @@ void rocksdb_options_set_max_successive_merges( opt->rep.max_successive_merges = v; } +void rocksdb_options_set_min_partial_merge_operands( + rocksdb_options_t* opt, uint32_t v) { + opt->rep.min_partial_merge_operands = v; +} + +void rocksdb_options_set_bloom_locality( + rocksdb_options_t* opt, uint32_t v) { + opt->rep.bloom_locality = v; +} + +void rocksdb_options_set_allow_thread_local( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.allow_thread_local = v; +} + +void rocksdb_options_set_inplace_update_support( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.inplace_update_support = v; +} + +void rocksdb_options_set_inplace_update_num_locks( + rocksdb_options_t* opt, size_t v) { + opt->rep.inplace_update_num_locks = v; +} + void rocksdb_options_set_compaction_style(rocksdb_options_t *opt, int style) { opt->rep.compaction_style = static_cast(style); } @@ -1035,21 +1097,14 @@ DB::OpenForReadOnly DB::MultiGet DB::KeyMayExist DB::GetOptions -DB::GetLiveFiles DB::GetSortedWalFiles DB::GetLatestSequenceNumber DB::GetUpdatesSince -DB::DeleteFile DB::GetDbIdentity DB::RunManualCompaction custom cache compaction_filter -max_bytes_for_level_multiplier_additional -access_hint_on_compaction_start -table_factory table_properties_collectors -inplace_update_support -inplace_update_num_locks */ rocksdb_comparator_t* rocksdb_comparator_create( diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 74ed6a340..7d4a374d9 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -243,6 +243,7 @@ extern void rocksdb_options_set_paranoid_checks( rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_env(rocksdb_options_t*, rocksdb_env_t*); extern void rocksdb_options_set_info_log(rocksdb_options_t*, rocksdb_logger_t*); +extern void rocksdb_options_set_info_log_level(rocksdb_options_t*, int); extern void rocksdb_options_set_write_buffer_size(rocksdb_options_t*, size_t); extern void rocksdb_options_set_max_open_files(rocksdb_options_t*, int); extern void rocksdb_options_set_cache(rocksdb_options_t*, rocksdb_cache_t*); @@ -275,6 +276,8 @@ extern void rocksdb_options_set_expanded_compaction_factor( rocksdb_options_t*, int); extern void rocksdb_options_set_max_grandparent_overlap_factor( rocksdb_options_t*, int); +extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional( + rocksdb_options_t*, int* level_values, size_t num_levels); extern void rocksdb_options_enable_statistics(rocksdb_options_t*); extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); @@ -330,10 +333,14 @@ extern void rocksdb_options_set_block_size_deviation( rocksdb_options_t*, int); extern void rocksdb_options_set_advise_random_on_open( rocksdb_options_t*, unsigned char); +extern void rocksdb_options_set_access_hint_on_compaction_start( + rocksdb_options_t*, int); extern void rocksdb_options_set_use_adaptive_mutex( rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_bytes_per_sync( rocksdb_options_t*, uint64_t); +extern void rocksdb_options_set_verify_checksums_in_compaction( + rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_filter_deletes( rocksdb_options_t*, unsigned char); extern void rocksdb_options_set_max_sequential_skip_in_iterations( @@ -361,6 +368,16 @@ extern void rocksdb_options_set_memtable_prefix_bloom_probes( rocksdb_options_t*, uint32_t); extern void rocksdb_options_set_max_successive_merges( rocksdb_options_t*, size_t); +extern void rocksdb_options_set_min_partial_merge_operands( + rocksdb_options_t*, uint32_t); +extern void rocksdb_options_set_bloom_locality( + rocksdb_options_t*, uint32_t); +extern void rocksdb_options_set_allow_thread_local( + rocksdb_options_t*, unsigned char); +extern void rocksdb_options_set_inplace_update_support( + rocksdb_options_t*, unsigned char); +extern void rocksdb_options_set_inplace_update_num_locks( + rocksdb_options_t*, size_t); enum { rocksdb_no_compression = 0, From c0b9fa8b3ec1543c998c8886bc30f1e22fc0a30b Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Apr 2014 10:21:46 -0700 Subject: [PATCH 4/5] Add script auto_sanity_test.sh to perform auto sanity test Summary: Add script auto_sanity_test.sh to perform auto sanity test usage: auto_sanity_test.sh [new_commit] [old_commit] Running without commit parameter will do the sanity test with the latest and the latest 10 commit. Test Plan: ./auto_sanity_test.sh Reviewers: haobo, igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D17397 --- tools/auto_sanity_test.sh | 71 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100755 tools/auto_sanity_test.sh diff --git a/tools/auto_sanity_test.sh b/tools/auto_sanity_test.sh new file mode 100755 index 000000000..2d63c0a85 --- /dev/null +++ b/tools/auto_sanity_test.sh @@ -0,0 +1,71 @@ +TMP_DIR="/tmp/rocksdb-sanity-test" + +if [ "$#" -lt 2 ]; then + echo "usage: ./auto_sanity_test.sh [new_commit] [old_commit]" + echo "Missing either [new_commit] or [old_commit], perform sanity check with the latest and 10th latest commits." + recent_commits=`git log | grep -e "^commit [a-z0-9]\+$"| head -n10 | sed -e 's/commit //g'` + commit_new=`echo "$recent_commits" | head -n1` + commit_old=`echo "$recent_commits" | tail -n1` + echo "the most recent commits are:" + echo "$recent_commits" +else + commit_new=$1 + commit_old=$2 +fi + +if [ ! -d $TMP_DIR ]; then + mkdir $TMP_DIR +fi +dir_new="${TMP_DIR}/${commit_new}" +dir_old="${TMP_DIR}/${commit_old}" + +function makestuff() { + echo "make clean" + make clean > /dev/null + echo "make db_sanity_test -j32" + make db_sanity_test -j32 > /dev/null + if [ $? -ne 0 ]; then + echo "[ERROR] Failed to perform 'make db_sanity_test'" + exit 1 + fi +} + +rm -r -f $dir_new +rm -r -f $dir_old + +echo "Running db sanity check with commits $commit_new and $commit_old." + +echo "=============================================================" +echo "Making build $commit_new" +makestuff +mv db_sanity_test new_db_sanity_test +echo "Creating db based on the new commit --- $commit_new" +./new_db_sanity_test $dir_new create + +echo "=============================================================" +echo "Making build $commit_old" +makestuff +mv db_sanity_test old_db_sanity_test +echo "Creating db based on the old commit --- $commit_old" +./old_db_sanity_test $dir_old create + +echo "=============================================================" +echo "Verifying new db $dir_new using the old commit --- $commit_old" +./old_db_sanity_test $dir_new verify +if [ $? -ne 0 ]; then + echo "[ERROR] Verification of $dir_new using commit $commit_old failed." + exit 2 +fi + +echo "=============================================================" +echo "Verifying old db $dir_old using the new commit --- $commit_new" +./new_db_sanity_test $dir_old verify +if [ $? -ne 0 ]; then + echo "[ERROR] Verification of $dir_old using commit $commit_new failed." + exit 2 +fi + +rm old_db_sanity_test +rm new_db_sanity_test + +echo "Auto sanity test passed!" From b9767d0e090d6e14c62c166bf59a877e2f39693d Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 2 Apr 2014 21:49:51 -0700 Subject: [PATCH 5/5] Move several more logging inside DB mutex to log buffer Summary: Move several some common logging still in DB mutex to log buffer. Test Plan: make all check Reviewers: haobo, igor, ljin, nkg- Reviewed By: nkg- CC: nkg-, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D17439 --- db/compaction_picker.cc | 2 +- db/db_impl.cc | 43 +++++++++++++++++++++-------------------- db/db_impl.h | 3 ++- db/memtable_list.cc | 20 +++++++++---------- db/memtable_list.h | 3 ++- 5 files changed, 36 insertions(+), 35 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index ccdbce72b..c39b6d328 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -587,7 +587,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, options_->level0_file_num_compaction_trigger; if ((c = PickCompactionUniversalReadAmp( version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { - Log(options_->info_log, "Universal: compacting for file num\n"); + LogToBuffer(log_buffer, "Universal: compacting for file num\n"); } } } diff --git a/db/db_impl.cc b/db/db_impl.cc index b813efc49..0f1d227cd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1460,7 +1460,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, s = imm_.InstallMemtableFlushResults( mems, versions_.get(), &mutex_, options_.info_log.get(), file_number, pending_outputs_, &deletion_state.memtables_to_free, - db_directory_.get()); + db_directory_.get(), log_buffer); } if (s.ok()) { @@ -2013,9 +2013,10 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, LogBuffer* log_buffer) { Status stat; while (stat.ok() && imm_.IsFlushPending()) { - Log(options_.info_log, - "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", - options_.max_background_flushes - bg_flush_scheduled_); + LogToBuffer(log_buffer, + "BackgroundCallFlush doing FlushMemTableToOutputFile, " + "flush slots available %d", + options_.max_background_flushes - bg_flush_scheduled_); stat = FlushMemTableToOutputFile(madeProgress, deletion_state, log_buffer); } return stat; @@ -2461,7 +2462,8 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } -Status DBImpl::InstallCompactionResults(CompactionState* compact) { +Status DBImpl::InstallCompactionResults(CompactionState* compact, + LogBuffer* log_buffer) { mutex_.AssertHeld(); // paranoia: verify that the files that we started with @@ -2477,11 +2479,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return Status::Corruption("Compaction input files inconsistent"); } - Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->level() + 1, + LogToBuffer( + log_buffer, "Compacted %d@%d + %d@%d files => %lld bytes", + compact->compaction->num_input_files(0), compact->compaction->level(), + compact->compaction->num_input_files(1), compact->compaction->level() + 1, static_cast(compact->total_bytes)); // Add compaction outputs @@ -2905,17 +2906,16 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, bool prefix_initialized = false; int64_t imm_micros = 0; // Micros spent doing imm_ compactions - Log(options_.info_log, - "Compacting %d@%d + %d@%d files, score %.2f slots available %d", - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->output_level(), - compact->compaction->score(), - options_.max_background_compactions - bg_compaction_scheduled_); + LogToBuffer(log_buffer, + "Compacting %d@%d + %d@%d files, score %.2f slots available %d", + compact->compaction->num_input_files(0), + compact->compaction->level(), + compact->compaction->num_input_files(1), + compact->compaction->output_level(), compact->compaction->score(), + options_.max_background_compactions - bg_compaction_scheduled_); char scratch[2345]; compact->compaction->Summary(scratch, sizeof(scratch)); - Log(options_.info_log, "Compaction start summary: %s\n", scratch); + LogToBuffer(log_buffer, "Compaction start summary: %s\n", scratch); assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == nullptr); @@ -3173,11 +3173,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, ReleaseCompactionUnusedFileNumbers(compact); if (status.ok()) { - status = InstallCompactionResults(compact); + status = InstallCompactionResults(compact, log_buffer); InstallSuperVersion(deletion_state); } Version::LevelSummaryStorage tmp; - Log(options_.info_log, + LogToBuffer( + log_buffer, "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s\n", diff --git a/db/db_impl.h b/db/db_impl.h index 3eb557a02..d8ac98cee 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -388,7 +388,8 @@ class DBImpl : public DB { Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); - Status InstallCompactionResults(CompactionState* compact); + Status InstallCompactionResults(CompactionState* compact, + LogBuffer* log_buffer); void AllocateCompactionOutputFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index ebda34802..3c502c6de 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -11,6 +11,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "util/coding.h" +#include "util/log_buffer.h" namespace rocksdb { @@ -140,10 +141,10 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - const autovector& mems, VersionSet* vset, - port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs, autovector* to_delete, - Directory* db_directory) { + const autovector& mems, VersionSet* vset, port::Mutex* mu, + Logger* info_log, uint64_t file_number, std::set& pending_outputs, + autovector* to_delete, Directory* db_directory, + LogBuffer* log_buffer) { mu->AssertHeld(); // flush was sucessful @@ -173,9 +174,8 @@ Status MemTableList::InstallMemtableFlushResults( break; } - Log(info_log, - "Level-0 commit table #%lu started", - (unsigned long)m->file_number_); + LogToBuffer(log_buffer, "Level-0 commit table #%lu started", + (unsigned long)m->file_number_); // this can release and reacquire the mutex. s = vset->LogAndApply(&m->edit_, mu, db_directory); @@ -189,10 +189,8 @@ Status MemTableList::InstallMemtableFlushResults( uint64_t mem_id = 1; // how many memtables has been flushed. do { if (s.ok()) { // commit new state - Log(info_log, - "Level-0 commit table #%lu: memtable #%lu done", - (unsigned long)m->file_number_, - (unsigned long)mem_id); + LogToBuffer(log_buffer, "Level-0 commit table #%lu: memtable #%lu done", + (unsigned long)m->file_number_, (unsigned long)mem_id); current_->Remove(m); assert(m->file_number_ > 0); diff --git a/db/memtable_list.h b/db/memtable_list.h index 3c87d4eee..0bf376e55 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -104,7 +104,8 @@ class MemTableList { Logger* info_log, uint64_t file_number, std::set& pending_outputs, autovector* to_delete, - Directory* db_directory); + Directory* db_directory, + LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add().