diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index a41ea7d4b..40b57b190 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -226,6 +226,73 @@ enum Tickers : uint32_t { // Number of internal keys skipped by Iterator NUMBER_ITER_SKIP, + // BlobDB specific stats + // # of Put/PutTTL/PutUntil to BlobDB. + BLOB_DB_NUM_PUT, + // # of Write to BlobDB. + BLOB_DB_NUM_WRITE, + // # of Get to BlobDB. + BLOB_DB_NUM_GET, + // # of MultiGet to BlobDB. + BLOB_DB_NUM_MULTIGET, + // # of Seek/SeekToFirst/SeekToLast/SeekForPrev to BlobDB iterator. + BLOB_DB_NUM_SEEK, + // # of Next to BlobDB iterator. + BLOB_DB_NUM_NEXT, + // # of Prev to BlobDB iterator. + BLOB_DB_NUM_PREV, + // # of keys written to BlobDB. + BLOB_DB_NUM_KEYS_WRITTEN, + // # of keys read from BlobDB. + BLOB_DB_NUM_KEYS_READ, + // # of bytes (key + value) written to BlobDB. + BLOB_DB_BYTES_WRITTEN, + // # of bytes (keys + value) read from BlobDB. + BLOB_DB_BYTES_READ, + // # of keys written by BlobDB as non-TTL inlined value. + BLOB_DB_WRITE_INLINED, + // # of keys written by BlobDB as TTL inlined value. + BLOB_DB_WRITE_INLINED_TTL, + // # of keys written by BlobDB as non-TTL blob value. + BLOB_DB_WRITE_BLOB, + // # of keys written by BlobDB as TTL blob value. + BLOB_DB_WRITE_BLOB_TTL, + // # of bytes written to blob file. + BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + // # of bytes read from blob file. + BLOB_DB_BLOB_FILE_BYTES_READ, + // # of times a blob files being synced. + BLOB_DB_BLOB_FILE_SYNCED, + // # of blob index evicted from base DB by BlobDB compaction filter because + // of expiration. + BLOB_DB_BLOB_INDEX_EXPIRED, + // # of blob files being garbage collected. + BLOB_DB_GC_NUM_FILES, + // # of blob files generated by garbage collection. + BLOB_DB_GC_NUM_NEW_FILES, + // # of BlobDB garbage collection failures. + BLOB_DB_GC_FAILURES, + // # of keys drop by BlobDB garbage collection because they had been + // overwritten. + BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, + // # of keys drop by BlobDB garbage collection because of expiration. + BLOB_DB_GC_NUM_KEYS_EXPIRED, + // # of keys relocated to new blob file by garbage collection. + BLOB_DB_GC_NUM_KEYS_RELOCATED, + // # of bytes drop by BlobDB garbage collection because they had been + // overwritten. + BLOB_DB_GC_BYTES_OVERWRITTEN, + // # of bytes drop by BlobDB garbage collection because of expiration. + BLOB_DB_GC_BYTES_EXPIRED, + // # of bytes relocated to new blob file by garbage collection. + BLOB_DB_GC_BYTES_RELOCATED, + // # of blob files evicted because of BlobDB is full. + BLOB_DB_FIFO_NUM_FILES_EVICTED, + // # of keys in the blob files evicted because of BlobDB is full. + BLOB_DB_FIFO_NUM_KEYS_EVICTED, + // # of bytes in the blob files evicted because of BlobDB is full. + BLOB_DB_FIFO_BYTES_EVICTED, + TICKER_ENUM_MAX }; @@ -268,9 +335,9 @@ const std::vector> TickersNameMap = { {COMPACTION_KEY_DROP_RANGE_DEL, "rocksdb.compaction.key.drop.range_del"}, {COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user"}, {COMPACTION_RANGE_DEL_DROP_OBSOLETE, - "rocksdb.compaction.range_del.drop.obsolete"}, + "rocksdb.compaction.range_del.drop.obsolete"}, {COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, - "rocksdb.compaction.optimized.del.drop.obsolete"}, + "rocksdb.compaction.optimized.del.drop.obsolete"}, {NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written"}, {NUMBER_KEYS_READ, "rocksdb.number.keys.read"}, {NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated"}, @@ -332,6 +399,37 @@ const std::vector> TickersNameMap = { {READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"}, {NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"}, {NUMBER_ITER_SKIP, "rocksdb.number.iter.skip"}, + {BLOB_DB_NUM_PUT, "rocksdb.blobdb.num.put"}, + {BLOB_DB_NUM_WRITE, "rocksdb.blobdb.num.write"}, + {BLOB_DB_NUM_GET, "rocksdb.blobdb.num.get"}, + {BLOB_DB_NUM_MULTIGET, "rocksdb.blobdb.num.multiget"}, + {BLOB_DB_NUM_SEEK, "rocksdb.blobdb.num.seek"}, + {BLOB_DB_NUM_NEXT, "rocksdb.blobdb.num.next"}, + {BLOB_DB_NUM_PREV, "rocksdb.blobdb.num.prev"}, + {BLOB_DB_NUM_KEYS_WRITTEN, "rocksdb.blobdb.num.keys.written"}, + {BLOB_DB_NUM_KEYS_READ, "rocksdb.blobdb.num.keys.read"}, + {BLOB_DB_BYTES_WRITTEN, "rocksdb.blobdb.bytes.written"}, + {BLOB_DB_BYTES_READ, "rocksdb.blobdb.bytes.read"}, + {BLOB_DB_WRITE_INLINED, "rocksdb.blobdb.write.inlined"}, + {BLOB_DB_WRITE_INLINED_TTL, "rocksdb.blobdb.write.inlined.ttl"}, + {BLOB_DB_WRITE_BLOB, "rocksdb.blobdb.write.blob"}, + {BLOB_DB_WRITE_BLOB_TTL, "rocksdb.blobdb.write.blob.ttl"}, + {BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"}, + {BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file,bytes.read"}, + {BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"}, + {BLOB_DB_BLOB_INDEX_EXPIRED, "rocksdb.blobdb.blob.index.expired"}, + {BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"}, + {BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"}, + {BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"}, + {BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, "rocksdb.blobdb.gc.num.keys.overwritten"}, + {BLOB_DB_GC_NUM_KEYS_EXPIRED, "rocksdb.blobdb.gc.num.keys.expired"}, + {BLOB_DB_GC_NUM_KEYS_RELOCATED, "rocksdb.blobdb.gc.num.keys.relocated"}, + {BLOB_DB_GC_BYTES_OVERWRITTEN, "rocksdb.blobdb.gc.bytes.overwritten"}, + {BLOB_DB_GC_BYTES_EXPIRED, "rocksdb.blobdb.gc.bytes.expired"}, + {BLOB_DB_GC_BYTES_RELOCATED, "rocksdb.blobdb.gc.bytes.relocated"}, + {BLOB_DB_FIFO_NUM_FILES_EVICTED, "rocksdb.blobdb.fifo.num.files.evicted"}, + {BLOB_DB_FIFO_NUM_KEYS_EVICTED, "rocksdb.blobdb.fifo.num.keys.evicted"}, + {BLOB_DB_FIFO_BYTES_EVICTED, "rocksdb.blobdb.fifo.bytes.evicted"}, }; /** @@ -383,6 +481,36 @@ enum Histograms : uint32_t { // requests. READ_NUM_MERGE_OPERANDS, + // BlobDB specific stats + // Size of keys written to BlobDB. + BLOB_DB_KEY_SIZE, + // Size of values written to BlobDB. + BLOB_DB_VALUE_SIZE, + // BlobDB Put/PutWithTTL/PutUntil/Write latency. + BLOB_DB_WRITE_MICROS, + // BlobDB Get lagency. + BLOB_DB_GET_MICROS, + // BlobDB MultiGet latency. + BLOB_DB_MULTIGET_MICROS, + // BlobDB Seek/SeekToFirst/SeekToLast/SeekForPrev latency. + BLOB_DB_SEEK_MICROS, + // BlobDB Next latency. + BLOB_DB_NEXT_MICROS, + // BlobDB Prev latency. + BLOB_DB_PREV_MICROS, + // Blob file write latency. + BLOB_DB_BLOB_FILE_WRITE_MICROS, + // Blob file read latency. + BLOB_DB_BLOB_FILE_READ_MICROS, + // Blob file sync latency. + BLOB_DB_BLOB_FILE_SYNC_MICROS, + // BlobDB garbage collection time. + BLOB_DB_GC_MICROS, + // BlobDB compression time. + BLOB_DB_COMPRESSION_MICROS, + // BlobDB decompression time. + BLOB_DB_DECOMPRESSION_MICROS, + HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match }; @@ -418,6 +546,20 @@ const std::vector> HistogramsNameMap = { {COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"}, {DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"}, {READ_NUM_MERGE_OPERANDS, "rocksdb.read.num.merge_operands"}, + {BLOB_DB_KEY_SIZE, "rocksdb.blobdb.key.size"}, + {BLOB_DB_VALUE_SIZE, "rocksdb.blobdb.value.size"}, + {BLOB_DB_WRITE_MICROS, "rocksdb.blobdb.write.micros"}, + {BLOB_DB_GET_MICROS, "rocksdb.blobdb.get.micros"}, + {BLOB_DB_MULTIGET_MICROS, "rocksdb.blobdb.multiget.micros"}, + {BLOB_DB_SEEK_MICROS, "rocksdb.blobdb.seek.micros"}, + {BLOB_DB_NEXT_MICROS, "rocksdb.blobdb.next.micros"}, + {BLOB_DB_PREV_MICROS, "rocksdb.blobdb.prev.micros"}, + {BLOB_DB_BLOB_FILE_WRITE_MICROS, "rocksdb.blobdb.blob.file.write.micros"}, + {BLOB_DB_BLOB_FILE_READ_MICROS, "rocksdb.blobdb.blob.file.read.micros"}, + {BLOB_DB_BLOB_FILE_SYNC_MICROS, "rocksdb.blobdb.blob.file.sync.micros"}, + {BLOB_DB_GC_MICROS, "rocksdb.blobdb.gc.micros"}, + {BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"}, + {BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"}, }; struct HistogramData { diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h index 26cd188fe..192a338ff 100644 --- a/utilities/blob_db/blob_compaction_filter.h +++ b/utilities/blob_db/blob_compaction_filter.h @@ -5,6 +5,7 @@ #pragma once #ifndef ROCKSDB_LITE +#include "monitoring/statistics.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" #include "utilities/blob_db/blob_index.h" @@ -15,8 +16,12 @@ namespace blob_db { // CompactionFilter to delete expired blob index from base DB. class BlobIndexCompactionFilter : public CompactionFilter { public: - explicit BlobIndexCompactionFilter(uint64_t current_time) - : current_time_(current_time) {} + BlobIndexCompactionFilter(uint64_t current_time, Statistics* statistics) + : current_time_(current_time), statistics_(statistics) {} + + virtual ~BlobIndexCompactionFilter() { + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED, expired_count_); + } virtual const char* Name() const override { return "BlobIndexCompactionFilter"; @@ -40,6 +45,7 @@ class BlobIndexCompactionFilter : public CompactionFilter { } if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { // Expired + expired_count_++; return Decision::kRemove; } return Decision::kKeep; @@ -47,11 +53,16 @@ class BlobIndexCompactionFilter : public CompactionFilter { private: const uint64_t current_time_; + Statistics* statistics_; + // It is safe to not using std::atomic since the compaction filter, created + // from a compaction filter factroy, will not be called from multiple threads. + mutable uint64_t expired_count_ = 0; }; class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { public: - explicit BlobIndexCompactionFilterFactory(Env* env) : env_(env) {} + BlobIndexCompactionFilterFactory(Env* env, Statistics* statistics) + : env_(env), statistics_(statistics) {} virtual const char* Name() const override { return "BlobIndexCompactionFilterFactory"; @@ -65,12 +76,13 @@ class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { return nullptr; } assert(current_time >= 0); - return std::unique_ptr( - new BlobIndexCompactionFilter(static_cast(current_time))); + return std::unique_ptr(new BlobIndexCompactionFilter( + static_cast(current_time), statistics_)); } private: Env* env_; + Statistics* statistics_; }; } // namespace blob_db diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index b278df77f..2ba4f23cd 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -70,7 +70,8 @@ Status BlobDB::OpenAndLoad(const Options& options, } changed_options->compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(options.env)); + new BlobIndexCompactionFilterFactory(options.env, + options.statistics.get())); changed_options->listeners.emplace_back(fblistener); if (bdb_options.enable_garbage_collection) { changed_options->listeners.emplace_back(ce_listener); @@ -163,7 +164,8 @@ Status BlobDB::Open(const DBOptions& db_options_input, return Status::NotSupported("Blob DB doesn't support compaction filter."); } cf_options.compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(db_options.env)); + new BlobIndexCompactionFilterFactory(db_options.env, + db_options.statistics.get())); ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options); // we need to open blob db first so that recovery can happen diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 98dbd35fe..fc8110d22 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -14,6 +14,7 @@ #include "db/db_impl.h" #include "db/write_batch_internal.h" #include "monitoring/instrumented_mutex.h" +#include "monitoring/statistics.h" #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -30,6 +31,7 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/random.h" +#include "util/stop_watch.h" #include "util/sync_point.h" #include "util/timer_queue.h" #include "utilities/blob_db/blob_db_iterator.h" @@ -106,16 +108,13 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, bdb_options_(blob_db_options), db_options_(db_options), env_options_(db_options), + statistics_(db_options_.statistics.get()), dir_change_(false), next_file_number_(1), epoch_of_(0), shutdown_(false), current_epoch_(0), open_file_count_(0), - last_period_write_(0), - last_period_ampl_(0), - total_periods_write_(0), - total_periods_ampl_(0), total_blob_space_(0), open_p1_done_(false), debug_level_(0), @@ -163,16 +162,13 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) bdb_options_(blob_db_options), db_options_(db->GetOptions()), env_options_(db_->GetOptions()), + statistics_(db_options_.statistics.get()), dir_change_(false), next_file_number_(1), epoch_of_(0), shutdown_(false), current_epoch_(0), open_file_count_(0), - last_period_write_(0), - last_period_ampl_(0), - total_periods_write_(0), - total_periods_ampl_(0), total_blob_space_(0), oldest_file_evicted_(false) { if (!bdb_options_.blob_dir.empty()) @@ -227,8 +223,6 @@ void BlobDBImpl::StartBackgroundTasks() { std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); tqueue_.add(kSanityCheckPeriodMillisecs, std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1)); - tqueue_.add(kWriteAmplificationStatsPeriodMillisecs, - std::bind(&BlobDBImpl::WaStats, this, std::placeholders::_1)); tqueue_.add(kFSyncFilesPeriodMillisecs, std::bind(&BlobDBImpl::FsyncFiles, this, std::placeholders::_1)); tqueue_.add( @@ -490,8 +484,8 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { } bfile->log_writer_ = std::make_shared( - std::move(fwriter), bfile->file_number_, bdb_options_.bytes_per_sync, - db_options_.use_fsync, boffset); + std::move(fwriter), env_, statistics_, bfile->file_number_, + bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset); bfile->log_writer_->last_elem_type_ = et; return s; @@ -745,7 +739,8 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler { }; Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { - + StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_WRITE); uint32_t default_cf_id = reinterpret_cast(DefaultColumnFamily())->GetID(); // TODO(yiwu): In case there are multiple writers the latest sequence would @@ -856,6 +851,8 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options, Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration) { + StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_PUT); TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); Status s; WriteBatch batch; @@ -888,11 +885,13 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, if (expiration == kNoExpiration) { // Put as normal value s = batch->Put(key, value); + RecordTick(statistics_, BLOB_DB_WRITE_INLINED); } else { // Inlined with TTL BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value); s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, index_entry); + RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL); } } else { std::shared_ptr bfile = (expiration != kNoExpiration) @@ -911,6 +910,11 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, &index_entry); + if (expiration == kNoExpiration) { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB); + } else { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL); + } if (s.ok()) { bfile->ExtendSequenceRange(sequence); @@ -932,6 +936,11 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, } } + RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN); + RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size()); + MeasureTime(statistics_, BLOB_DB_KEY_SIZE, key.size()); + MeasureTime(statistics_, BLOB_DB_VALUE_SIZE, value.size()); + return s; } @@ -940,6 +949,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, if (bdb_options_.compression == kNoCompression) { return raw; } + StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); CompressionType ct = bdb_options_.compression; CompressionOptions compression_opts; CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat, @@ -991,6 +1001,11 @@ bool BlobDBImpl::EvictOldestBlobFile() { oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second); obsolete_files_.push_back(oldest_file); oldest_file_evicted_.store(true); + RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED); + RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED, + oldest_file->BlobCount()); + RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED, + oldest_file->GetFileSize()); return true; } @@ -1048,7 +1063,6 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, bfile->blob_count_++; bfile->file_size_ += size_put; - last_period_write_ += size_put; total_blob_space_ += size_put; if (expiration == kNoExpiration) { @@ -1066,6 +1080,8 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, std::vector BlobDBImpl::MultiGet( const ReadOptions& read_options, const std::vector& keys, std::vector* values) { + StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_MULTIGET); // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. ReadOptions ro(read_options); @@ -1169,7 +1185,12 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, char* buffer = &(*valueptr)[0]; Slice blob_value; - s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value, buffer); + { + StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); + s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value, + buffer); + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_value.size()); + } if (!s.ok() || blob_value.size() != blob_index.size()) { if (debug_level_ >= 2) { ROCKS_LOG_ERROR(db_options_.info_log, @@ -1218,10 +1239,14 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, if (bfile->compression() != kNoCompression) { BlockContents contents; auto cfh = reinterpret_cast(DefaultColumnFamily()); - s = UncompressBlockContentsForCompressionType( - blob_value.data(), blob_value.size(), &contents, - kBlockBasedTableVersionFormat, Slice(), bfile->compression(), - *(cfh->cfd()->ioptions())); + { + StopWatch decompression_sw(env_, statistics_, + BLOB_DB_DECOMPRESSION_MICROS); + s = UncompressBlockContentsForCompressionType( + blob_value.data(), blob_value.size(), &contents, + kBlockBasedTableVersionFormat, Slice(), bfile->compression(), + *(cfh->cfd()->ioptions())); + } *(value->GetSelf()) = contents.data.ToString(); } @@ -1233,6 +1258,14 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, Status BlobDBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { + StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_GET); + return GetImpl(read_options, column_family, key, value); +} + +Status BlobDBImpl::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { if (column_family != DefaultColumnFamily()) { return Status::NotSupported( "Blob DB doesn't support non-default column family."); @@ -1258,6 +1291,8 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, if (snapshot_created) { db_->ReleaseSnapshot(ro.snapshot); } + RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ); + RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size()); return s; } @@ -1546,35 +1581,6 @@ std::pair BlobDBImpl::ReclaimOpenFiles(bool aborted) { return std::make_pair(true, -1); } -// TODO(yiwu): correct the stats and expose it. -std::pair BlobDBImpl::WaStats(bool aborted) { - if (aborted) return std::make_pair(false, -1); - - WriteLock wl(&mutex_); - - if (all_periods_write_.size() >= kWriteAmplificationStatsPeriods) { - total_periods_write_ -= (*all_periods_write_.begin()); - total_periods_ampl_ = (*all_periods_ampl_.begin()); - - all_periods_write_.pop_front(); - all_periods_ampl_.pop_front(); - } - - uint64_t val1 = last_period_write_.load(); - uint64_t val2 = last_period_ampl_.load(); - - all_periods_write_.push_back(val1); - all_periods_ampl_.push_back(val2); - - last_period_write_ = 0; - last_period_ampl_ = 0; - - total_periods_write_ += val1; - total_periods_ampl_ += val2; - - return std::make_pair(true, -1); -} - // Write callback for garbage collection to check if key has been updated // since last read. Similar to how OptimisticTransaction works. See inline // comment in GCFileAndUpdateLSM(). @@ -1635,6 +1641,7 @@ class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback { // DELETED in the LSM Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, GCStats* gc_stats) { + StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS); uint64_t now = EpochNow(); std::shared_ptr reader = @@ -1727,6 +1734,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, if (get_status.IsNotFound() || !is_blob_index) { // Either the key is deleted or updated with a newer version whish is // inlined in LSM. + gc_stats->num_keys_overwritten++; + gc_stats->bytes_overwritten += record.record_size(); continue; } @@ -1742,6 +1751,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, blob_index.file_number() != bfptr->BlobFileNumber() || blob_index.offset() != blob_offset) { // Key has been overwritten. Drop the blob record. + gc_stats->num_keys_overwritten++; + gc_stats->bytes_overwritten += record.record_size(); continue; } @@ -1751,8 +1762,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter. // We can just drop the blob record. if (no_relocation_ttl || (has_ttl && now >= record.expiration)) { - gc_stats->num_deletes++; - gc_stats->deleted_size += record.value_size; + gc_stats->num_keys_expired++; + gc_stats->bytes_expired += record.record_size(); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); WriteBatch delete_batch; Status delete_status = delete_batch.Delete(record.key); @@ -1760,12 +1771,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, delete_status = db_impl_->WriteWithCallback(WriteOptions(), &delete_batch, &callback); } - if (delete_status.ok()) { - gc_stats->delete_succeeded++; - } else if (delete_status.IsBusy()) { - // The key is overwritten in the meanwhile. Drop the blob record. - gc_stats->overwritten_while_delete++; - } else { + if (!delete_status.ok() && !delete_status.IsBusy()) { // We hit an error. s = delete_status; ROCKS_LOG_ERROR(db_options_.info_log, @@ -1788,7 +1794,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, std::string reason("GC of "); reason += bfptr->PathName(); newfile = NewBlobFile(reason); - gc_stats->newfile = newfile; new_writer = CheckOrCreateWriterLocked(newfile); newfile->header_ = std::move(header); @@ -1810,9 +1815,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); } - gc_stats->num_relocate++; std::string new_index_entry; - uint64_t new_blob_offset = 0; uint64_t new_key_offset = 0; // write the blob to the blob log. @@ -1838,10 +1841,12 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, if (rewrite_status.ok()) { newfile->ExtendSequenceRange( WriteBatchInternal::Sequence(&rewrite_batch)); - gc_stats->relocate_succeeded++; + gc_stats->num_keys_relocated++; + gc_stats->bytes_relocated += record.record_size(); } else if (rewrite_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. - gc_stats->overwritten_while_relocate++; + gc_stats->num_keys_overwritten++; + gc_stats->bytes_overwritten += record.record_size(); } else { // We hit an error. s = rewrite_status; @@ -1864,17 +1869,34 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, ROCKS_LOG_INFO( db_options_.info_log, - "%s blob file %" PRIu64 - ". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64 - " succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.", + "%s blob file %" PRIu64 ". Total blob records: %" PRIu64 + ", Expired: %" PRIu64 " keys/%" PRIu64 " bytes, Overwritten: %" PRIu64 + " keys/%" PRIu64 " bytes.", s.ok() ? "Successfully garbage collected" : "Failed to garbage collect", - bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->delete_succeeded, - gc_stats->num_deletes, gc_stats->relocate_succeeded, - gc_stats->num_relocate); + bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired, + gc_stats->bytes_expired, gc_stats->num_keys_overwritten, + gc_stats->bytes_overwritten, gc_stats->num_keys_relocated, + gc_stats->bytes_relocated); + RecordTick(statistics_, BLOB_DB_GC_NUM_FILES); + RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, + gc_stats->num_keys_overwritten); + RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED, + gc_stats->num_keys_expired); + RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN, + gc_stats->bytes_overwritten); + RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired); if (newfile != nullptr) { total_blob_space_ += newfile->file_size_; ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", newfile->BlobFileNumber()); + RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES); + RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED, + gc_stats->num_keys_relocated); + RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED, + gc_stats->bytes_relocated); + } + if (!s.ok()) { + RecordTick(statistics_, BLOB_DB_GC_FAILURES); } return s; } @@ -2120,8 +2142,10 @@ std::pair BlobDBImpl::RunGC(bool aborted) { if (bfile->gc_once_after_open_.load()) { WriteLock lockbfile_w(&bfile->mutex_); - bfile->deleted_size_ = gc_stats.deleted_size; - bfile->deleted_count_ = gc_stats.num_deletes; + bfile->deleted_size_ = + gc_stats.bytes_overwritten + gc_stats.bytes_expired; + bfile->deleted_count_ = + gc_stats.num_keys_overwritten + gc_stats.num_keys_expired; bfile->gc_once_after_open_ = false; } } @@ -2144,7 +2168,7 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { auto* iter = db_impl_->NewIteratorImpl( read_options, cfd, snapshot->GetSequenceNumber(), nullptr /*read_callback*/, true /*allow_blob*/); - return new BlobDBIterator(own_snapshot, iter, this); + return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_); } Status DestroyBlobDB(const std::string& dbname, const Options& options, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 9881107d3..2a2eb83a2 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -24,6 +24,7 @@ #include "rocksdb/db.h" #include "rocksdb/listener.h" #include "rocksdb/options.h" +#include "rocksdb/statistics.h" #include "rocksdb/wal_filter.h" #include "util/mpsc.h" #include "util/mutexlock.h" @@ -135,16 +136,12 @@ struct blobf_compare_ttl { struct GCStats { uint64_t blob_count = 0; - uint64_t num_deletes = 0; - uint64_t deleted_size = 0; - uint64_t retry_delete = 0; - uint64_t delete_succeeded = 0; - uint64_t overwritten_while_delete = 0; - uint64_t num_relocate = 0; - uint64_t retry_relocate = 0; - uint64_t relocate_succeeded = 0; - uint64_t overwritten_while_relocate = 0; - std::shared_ptr newfile = nullptr; + uint64_t num_keys_overwritten = 0; + uint64_t num_keys_expired = 0; + uint64_t num_keys_relocated = 0; + uint64_t bytes_overwritten = 0; + uint64_t bytes_expired = 0; + uint64_t bytes_relocated = 0; }; /** @@ -178,10 +175,6 @@ class BlobDBImpl : public BlobDB { // how many periods of stats do we keep. static constexpr uint32_t kWriteAmplificationStatsPeriods = 24; - // what is the length of any period - static constexpr uint32_t kWriteAmplificationStatsPeriodMillisecs = - 3600 * 1000; - // we will garbage collect blob files in // which entire files have expired. However if the // ttl_range of files is very large say a day, we @@ -292,6 +285,10 @@ class BlobDBImpl : public BlobDB { // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); + Status GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value); + Status GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value); @@ -364,9 +361,6 @@ class BlobDBImpl : public BlobDB { // efficiency std::pair ReclaimOpenFiles(bool aborted); - // periodically print write amplification statistics - std::pair WaStats(bool aborted); - // background task to do book-keeping of deleted keys std::pair EvictDeletions(bool aborted); @@ -444,6 +438,9 @@ class BlobDBImpl : public BlobDB { DBOptions db_options_; EnvOptions env_options_; + // Raw pointer of statistic. db_options_ has a shared_ptr to hold ownership. + Statistics* statistics_; + // name of the database directory std::string dbname_; @@ -519,18 +516,6 @@ class BlobDBImpl : public BlobDB { // counter is used to monitor and close excess RA files. std::atomic open_file_count_; - // should hold mutex to modify - // STATISTICS for WA of Blob Files due to GC - // collect by default 24 hourly periods - std::list all_periods_write_; - std::list all_periods_ampl_; - - std::atomic last_period_write_; - std::atomic last_period_ampl_; - - uint64_t total_periods_write_; - uint64_t total_periods_ampl_; - // total size of all blob files at a given time std::atomic total_blob_space_; std::list> obsolete_files_; diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index c8aa1ff17..f901df366 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -6,7 +6,9 @@ #pragma once #ifndef ROCKSDB_LITE +#include "monitoring/statistics.h" #include "rocksdb/iterator.h" +#include "util/stop_watch.h" #include "utilities/blob_db/blob_db_impl.h" namespace rocksdb { @@ -17,8 +19,12 @@ using rocksdb::ManagedSnapshot; class BlobDBIterator : public Iterator { public: BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter, - BlobDBImpl* blob_db) - : snapshot_(snapshot), iter_(iter), blob_db_(blob_db) {} + BlobDBImpl* blob_db, Env* env, Statistics* statistics) + : snapshot_(snapshot), + iter_(iter), + blob_db_(blob_db), + env_(env), + statistics_(statistics) {} virtual ~BlobDBIterator() = default; @@ -37,33 +43,45 @@ class BlobDBIterator : public Iterator { } void SeekToFirst() override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekToFirst(); UpdateBlobValue(); } void SeekToLast() override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekToLast(); UpdateBlobValue(); } void Seek(const Slice& target) override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->Seek(target); UpdateBlobValue(); } void SeekForPrev(const Slice& target) override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekForPrev(target); UpdateBlobValue(); } void Next() override { assert(Valid()); + StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_NEXT); iter_->Next(); UpdateBlobValue(); } void Prev() override { assert(Valid()); + StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_PREV); iter_->Prev(); UpdateBlobValue(); } @@ -96,6 +114,8 @@ class BlobDBIterator : public Iterator { std::unique_ptr snapshot_; std::unique_ptr iter_; BlobDBImpl* blob_db_; + Env* env_; + Statistics* statistics_; Status status_; PinnableSlice value_; }; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 7b199e305..73ec6ca47 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -260,8 +260,8 @@ TEST_F(BlobDBTest, PutWithTTL) { ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocate); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -290,8 +290,8 @@ TEST_F(BlobDBTest, PutUntil) { ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocate); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -323,8 +323,8 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(100, gc_stats.num_relocate); + ASSERT_EQ(0, gc_stats.num_keys_expired); + ASSERT_EQ(100, gc_stats.num_keys_relocated); VerifyDB(data); } @@ -370,8 +370,8 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; - ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocate); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -418,8 +418,8 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; - ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocate); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -475,8 +475,8 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); - ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocate); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); VerifyDB(data); } @@ -675,8 +675,8 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { GCStats gc_stats; ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(200, gc_stats.blob_count); - ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(200 - new_keys, gc_stats.num_relocate); + ASSERT_EQ(0, gc_stats.num_keys_expired); + ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated); VerifyDB(data); } @@ -704,10 +704,9 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { GCStats gc_stats; ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(1, gc_stats.num_relocate); - ASSERT_EQ(0, gc_stats.relocate_succeeded); - ASSERT_EQ(1, gc_stats.overwritten_while_relocate); + ASSERT_EQ(0, gc_stats.num_keys_expired); + ASSERT_EQ(1, gc_stats.num_keys_overwritten); + ASSERT_EQ(0, gc_stats.num_keys_relocated); writer.join(); VerifyDB({{"foo", "v2"}}); } @@ -741,10 +740,8 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { GCStats gc_stats; ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(1, gc_stats.num_deletes); - ASSERT_EQ(0, gc_stats.delete_succeeded); - ASSERT_EQ(1, gc_stats.overwritten_while_delete); - ASSERT_EQ(0, gc_stats.num_relocate); + ASSERT_EQ(1, gc_stats.num_keys_expired); + ASSERT_EQ(0, gc_stats.num_keys_relocated); writer.join(); VerifyDB({{"foo", "v2"}}); } @@ -838,8 +835,7 @@ TEST_F(BlobDBTest, ReadWhileGC) { GCStats gc_stats; ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); - ASSERT_EQ(1, gc_stats.num_relocate); - ASSERT_EQ(1, gc_stats.relocate_succeeded); + ASSERT_EQ(1, gc_stats.num_keys_relocated); blob_db_impl()->TEST_DeleteObsoleteFiles(); // The file shouln't be deleted blob_files = blob_db_impl()->TEST_GetBlobFiles(); @@ -904,11 +900,11 @@ TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { ASSERT_TRUE(bfile->Obsolete()); ASSERT_EQ(1, gc_stats.blob_count); if (delete_key) { - ASSERT_EQ(0, gc_stats.num_relocate); + ASSERT_EQ(0, gc_stats.num_keys_relocated); ASSERT_EQ(bfile->GetSequenceRange().second + 1, bfile->GetObsoleteSequence()); } else { - ASSERT_EQ(1, gc_stats.num_relocate); + ASSERT_EQ(1, gc_stats.num_keys_relocated); ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), bfile->GetObsoleteSequence()); } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 0b2aa8d3d..3a7812132 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -100,8 +100,8 @@ std::shared_ptr BlobFile::OpenSequentialReader( std::unique_ptr sfile_reader; sfile_reader.reset(new SequentialFileReader(std::move(sfile))); - std::shared_ptr log_reader = - std::make_shared(db_options.info_log, std::move(sfile_reader)); + std::shared_ptr log_reader = std::make_shared( + std::move(sfile_reader), db_options.env, db_options.statistics.get()); return log_reader; } diff --git a/utilities/blob_db/blob_log_format.h b/utilities/blob_db/blob_log_format.h index 0b5cff547..a770aa765 100644 --- a/utilities/blob_db/blob_log_format.h +++ b/utilities/blob_db/blob_log_format.h @@ -111,6 +111,8 @@ struct BlobLogRecord { std::string key_buf; std::string value_buf; + uint64_t record_size() const { return kHeaderSize + key_size + value_size; } + void EncodeHeaderTo(std::string* dst); Status DecodeHeaderFrom(Slice src); diff --git a/utilities/blob_db/blob_log_reader.cc b/utilities/blob_db/blob_log_reader.cc index a2421b930..beec373f9 100644 --- a/utilities/blob_db/blob_log_reader.cc +++ b/utilities/blob_db/blob_log_reader.cc @@ -9,22 +9,30 @@ #include +#include "monitoring/statistics.h" #include "util/file_reader_writer.h" +#include "util/stop_watch.h" namespace rocksdb { namespace blob_db { -Reader::Reader(std::shared_ptr info_log, - unique_ptr&& _file) - : info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) {} +Reader::Reader(unique_ptr&& file_reader, Env* env, + Statistics* statistics) + : file_(std::move(file_reader)), + env_(env), + statistics_(statistics), + buffer_(), + next_byte_(0) {} Status Reader::ReadSlice(uint64_t size, Slice* slice, std::string* buf) { + StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); buf->reserve(size); Status s = file_->Read(size, slice, &(*buf)[0]); next_byte_ += size; if (!s.ok()) { return s; } + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size()); if (slice->size() != size) { return Status::Corruption("EOF reached while reading record"); } diff --git a/utilities/blob_db/blob_log_reader.h b/utilities/blob_db/blob_log_reader.h index 9c76b92ae..72015ef1b 100644 --- a/utilities/blob_db/blob_log_reader.h +++ b/utilities/blob_db/blob_log_reader.h @@ -10,7 +10,9 @@ #include #include +#include "rocksdb/env.h" #include "rocksdb/slice.h" +#include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "utilities/blob_db/blob_log_format.h" @@ -37,17 +39,8 @@ class Reader { // Create a reader that will return log records from "*file". // "*file" must remain live while this Reader is in use. - // - // If "reporter" is non-nullptr, it is notified whenever some data is - // dropped due to a detected corruption. "*reporter" must remain - // live while this Reader is in use. - // - // If "checksum" is true, verify checksums if available. - // - // The Reader will start reading at the first record located at physical - // position >= initial_offset within the file. - Reader(std::shared_ptr info_log, - std::unique_ptr&& file); + Reader(std::unique_ptr&& file_reader, Env* env, + Statistics* statistics); ~Reader() = default; @@ -68,17 +61,14 @@ class Reader { Status ReadSlice(uint64_t size, Slice* slice, std::string* buf); - SequentialFileReader* file() { return file_.get(); } - void ResetNextByte() { next_byte_ = 0; } uint64_t GetNextByte() const { return next_byte_; } - const SequentialFileReader* file_reader() const { return file_.get(); } - private: - std::shared_ptr info_log_; const std::unique_ptr file_; + Env* env_; + Statistics* statistics_; std::string backing_store_; Slice buffer_; diff --git a/utilities/blob_db/blob_log_writer.cc b/utilities/blob_db/blob_log_writer.cc index 806ca3c95..90e648290 100644 --- a/utilities/blob_db/blob_log_writer.cc +++ b/utilities/blob_db/blob_log_writer.cc @@ -8,17 +8,23 @@ #include #include + +#include "monitoring/statistics.h" #include "rocksdb/env.h" #include "util/coding.h" #include "util/file_reader_writer.h" +#include "util/stop_watch.h" #include "utilities/blob_db/blob_log_format.h" namespace rocksdb { namespace blob_db { -Writer::Writer(unique_ptr&& dest, uint64_t log_number, - uint64_t bpsync, bool use_fs, uint64_t boffset) +Writer::Writer(unique_ptr&& dest, Env* env, + Statistics* statistics, uint64_t log_number, uint64_t bpsync, + bool use_fs, uint64_t boffset) : dest_(std::move(dest)), + env_(env), + statistics_(statistics), log_number_(log_number), block_offset_(boffset), bytes_per_sync_(bpsync), @@ -26,7 +32,11 @@ Writer::Writer(unique_ptr&& dest, uint64_t log_number, use_fsync_(use_fs), last_elem_type_(kEtNone) {} -void Writer::Sync() { dest_->Sync(use_fsync_); } +void Writer::Sync() { + StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS); + dest_->Sync(use_fsync_); + RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED); +} Status Writer::WriteHeader(BlobLogHeader& header) { assert(block_offset_ == 0); @@ -40,6 +50,8 @@ Status Writer::WriteHeader(BlobLogHeader& header) { s = dest_->Flush(); } last_elem_type_ = kEtFileHdr; + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + BlobLogHeader::kSize); return s; } @@ -58,6 +70,8 @@ Status Writer::AppendFooter(BlobLogFooter& footer) { } last_elem_type_ = kEtFileFooter; + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + BlobLogFooter::kSize); return s; } @@ -98,6 +112,7 @@ void Writer::ConstructBlobHeader(std::string* buf, const Slice& key, Status Writer::EmitPhysicalRecord(const std::string& headerbuf, const Slice& key, const Slice& val, uint64_t* key_offset, uint64_t* blob_offset) { + StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS); Status s = dest_->Append(Slice(headerbuf)); if (s.ok()) { s = dest_->Append(key); @@ -113,6 +128,8 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf, *blob_offset = *key_offset + key.size(); block_offset_ = *blob_offset + val.size(); last_elem_type_ = kEtRecord; + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + BlobLogRecord::kHeaderSize + key.size() + val.size()); return s; } diff --git a/utilities/blob_db/blob_log_writer.h b/utilities/blob_db/blob_log_writer.h index 2a1f05e1b..a41bd2c5a 100644 --- a/utilities/blob_db/blob_log_writer.h +++ b/utilities/blob_db/blob_log_writer.h @@ -10,7 +10,9 @@ #include #include +#include "rocksdb/env.h" #include "rocksdb/slice.h" +#include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/types.h" #include "utilities/blob_db/blob_log_format.h" @@ -34,9 +36,9 @@ class Writer { // Create a writer that will append data to "*dest". // "*dest" must be initially empty. // "*dest" must remain live while this Writer is in use. - explicit Writer(std::unique_ptr&& dest, - uint64_t log_number, uint64_t bpsync, bool use_fsync, - uint64_t boffset = 0); + Writer(std::unique_ptr&& dest, Env* env, + Statistics* statistics, uint64_t log_number, uint64_t bpsync, + bool use_fsync, uint64_t boffset = 0); ~Writer() = default; @@ -75,6 +77,8 @@ class Writer { private: std::unique_ptr dest_; + Env* env_; + Statistics* statistics_; uint64_t log_number_; uint64_t block_offset_; // Current offset in block uint64_t bytes_per_sync_;