From d3759ca121d16f16a58666dad4b644113df1b9e9 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Mon, 27 Aug 2012 00:50:26 -0700 Subject: [PATCH] fix db_test error with scribe logger turned on Summary: as subject Test Plan: db_test Reviewers: dhruba Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D4929 --- db/db_impl.cc | 5 ++++- db/db_impl.h | 6 +++++- db/db_stats_logger.cc | 43 +++++++++++++++++++++++++++++------------ db/version_set.cc | 7 +++++-- scribe/scribe_logger.cc | 30 +++++++++++----------------- scribe/scribe_logger.h | 4 +--- 6 files changed, 57 insertions(+), 38 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 17c52454e..6876e891b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -134,6 +134,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) log_(NULL), tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), + bg_logstats_scheduled_(false), manual_compaction_(NULL), logger_(NULL) { mem_->Ref(); @@ -168,7 +169,7 @@ DBImpl::~DBImpl() { // Wait for background work to finish mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-NULL value is ok - while (bg_compaction_scheduled_) { + while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { bg_cv_.Wait(); } mutex_.Unlock(); @@ -192,6 +193,8 @@ DBImpl::~DBImpl() { if (owns_cache_) { delete options_.block_cache; } + + delete logger_; } Status DBImpl::NewDB() { diff --git a/db/db_impl.h b/db/db_impl.h index e1e7578bc..b618af809 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -115,7 +115,8 @@ class DBImpl : public DB { Status WaitForCompactMemTable(); void MaybeScheduleLogDBDeployStats(); - static void LogDBDeployStats(void* db); + static void BGLogDBDeployStats(void* db); + void LogDBDeployStats(); void MaybeScheduleCompaction(); static void BGWork(void* db); @@ -169,6 +170,9 @@ class DBImpl : public DB { // Has a background compaction been scheduled or is running? bool bg_compaction_scheduled_; + // Has a background stats log thread scheduled? + bool bg_logstats_scheduled_; + // Information for a manual compaction struct ManualCompaction { int level; diff --git a/db/db_stats_logger.cc b/db/db_stats_logger.cc index 4cac0c95b..c189df91d 100644 --- a/db/db_stats_logger.cc +++ b/db/db_stats_logger.cc @@ -8,6 +8,8 @@ #include "db/version_set.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "port/port.h" +#include "util/mutexlock.h" namespace leveldb { @@ -18,7 +20,8 @@ void DBImpl::MaybeScheduleLogDBDeployStats() { || host_name_.empty()) { return; } - if (shutting_down_.Acquire_Load()) { + + if(bg_logstats_scheduled_ || shutting_down_.Acquire_Load()) { // Already scheduled } else { int64_t current_ts = 0; @@ -30,43 +33,59 @@ void DBImpl::MaybeScheduleLogDBDeployStats() { return; } last_log_ts = current_ts; - env_->Schedule(&DBImpl::LogDBDeployStats, this); + bg_logstats_scheduled_ = true; + env_->Schedule(&DBImpl::BGLogDBDeployStats, this); } } -void DBImpl::LogDBDeployStats(void* db) { +void DBImpl::BGLogDBDeployStats(void* db) { DBImpl* db_inst = reinterpret_cast(db); + db_inst->LogDBDeployStats(); +} + +void DBImpl::LogDBDeployStats() { + mutex_.Lock(); - if (db_inst->shutting_down_.Acquire_Load()) { + if (shutting_down_.Acquire_Load()) { + bg_logstats_scheduled_ = false; + bg_cv_.SignalAll(); + mutex_.Unlock(); return; } + mutex_.Unlock(); + std::string version_info; version_info += boost::lexical_cast(kMajorVersion); version_info += "."; version_info += boost::lexical_cast(kMinorVersion); std::string data_dir; - db_inst->env_->GetAbsolutePath(db_inst->dbname_, &data_dir); + env_->GetAbsolutePath(dbname_, &data_dir); uint64_t file_total_size = 0; uint32_t file_total_num = 0; - for (int i = 0; i < db_inst->versions_->NumberLevels(); i++) { - file_total_num += db_inst->versions_->NumLevelFiles(i); - file_total_size += db_inst->versions_->NumLevelBytes(i); + for (int i = 0; i < versions_->NumberLevels(); i++) { + file_total_num += versions_->NumLevelFiles(i); + file_total_size += versions_->NumLevelBytes(i); } VersionSet::LevelSummaryStorage scratch; - const char* file_num_summary = db_inst->versions_->LevelSummary(&scratch); + const char* file_num_summary = versions_->LevelSummary(&scratch); std::string file_num_per_level(file_num_summary); - const char* file_size_summary = db_inst->versions_->LevelDataSizeSummary( + const char* file_size_summary = versions_->LevelDataSizeSummary( &scratch); std::string data_size_per_level(file_num_summary); int64_t unix_ts; - db_inst->env_->GetCurrentTime(&unix_ts); + env_->GetCurrentTime(&unix_ts); - db_inst->logger_->Log_Deploy_Stats(version_info, db_inst->host_name_, + logger_->Log_Deploy_Stats(version_info, host_name_, data_dir, file_total_size, file_total_num, file_num_per_level, data_size_per_level, unix_ts); + + mutex_.Lock(); + bg_logstats_scheduled_ = false; + bg_cv_.SignalAll(); + mutex_.Unlock(); } } diff --git a/db/version_set.cc b/db/version_set.cc index 56a78e8f5..a7a2e2284 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -22,7 +22,7 @@ namespace leveldb { static int64_t TotalFileSize(const std::vector& files) { int64_t sum = 0; - for (size_t i = 0; i < files.size(); i++) { + for (size_t i = 0; i < files.size() && files[i]; i++) { sum += files[i]->file_size; } return sum; @@ -1223,7 +1223,10 @@ void VersionSet::AddLiveFiles(std::set* live) { int64_t VersionSet::NumLevelBytes(int level) const { assert(level >= 0); assert(level < NumberLevels()); - return TotalFileSize(current_->files_[level]); + if(current_ && level < current_->files_->size()) + return TotalFileSize(current_->files_[level]); + else + return 0; } int64_t VersionSet::MaxNextLevelOverlappingBytes() { diff --git a/scribe/scribe_logger.cc b/scribe/scribe_logger.cc index 5c10e93fb..e39f7f4b4 100644 --- a/scribe/scribe_logger.cc +++ b/scribe/scribe_logger.cc @@ -6,12 +6,11 @@ const std::string ScribeLogger::COL_SEPERATOR = "\x1"; const std::string ScribeLogger::DEPLOY_STATS_CATEGORY = "leveldb_deploy_stats"; ScribeLogger::ScribeLogger(const std::string& host, int port, - int retry_times, uint32_t retry_intervals, int batch_size) + int retry_times, uint32_t retry_intervals) : host_(host), port_(port), retry_times_(retry_times), - retry_intervals_ (retry_intervals), - batch_size_ (batch_size) { + retry_intervals_ (retry_intervals) { shared_ptr socket(new TSocket(host_, port_)); shared_ptr framedTransport(new TFramedTransport(socket)); framedTransport->open(); @@ -25,23 +24,16 @@ void ScribeLogger::Log(const std::string& category, entry.category = category; entry.message = message; - logger_mutex_.Lock(); - logs_.push_back(entry); - - if (logs_.size() >= batch_size_) { - ResultCode ret = scribe_client_->Log(logs_); - int retries_left = retry_times_; - while (ret == TRY_LATER && retries_left > 0) { - Env::Default()->SleepForMicroseconds(retry_intervals_); - ret = scribe_client_->Log(logs_); - retries_left--; - } + std::vector logs; + logs.push_back(entry); - // Clear the local messages if either successfully write out - // or has failed in the last 10 calls. - if (ret == OK || logs_.size() > batch_size_ * 5) { - logs_.clear(); - } + logger_mutex_.Lock(); + ResultCode ret = scribe_client_->Log(logs); + int retries_left = retry_times_; + while (ret == TRY_LATER && retries_left > 0) { + Env::Default()->SleepForMicroseconds(retry_intervals_); + ret = scribe_client_->Log(logs); + retries_left--; } logger_mutex_.Unlock(); diff --git a/scribe/scribe_logger.h b/scribe/scribe_logger.h index 2e56a6c18..bd986eeb7 100644 --- a/scribe/scribe_logger.h +++ b/scribe/scribe_logger.h @@ -34,7 +34,6 @@ private: int batch_size_; scribeClient* scribe_client_; - std::vector logs_; port::Mutex logger_mutex_; int retry_times_; @@ -48,8 +47,7 @@ public: static const std::string DEPLOY_STATS_CATEGORY; ScribeLogger(const std::string& host, int port, - int retry_times=3, uint32_t retry_intervals=1000000, - int batch_size=1); + int retry_times=3, uint32_t retry_intervals=1000000); virtual ~ScribeLogger(); virtual void Log(const std::string& category, const std::string& message);