You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/db/compaction_job.h

165 lines
5.7 KiB

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <deque>
#include <limits>
#include <set>
#include <utility>
#include <vector>
#include <string>
#include <functional>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/compaction_job_stats.h"
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/event_logger.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"
namespace rocksdb {
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class Arena;
class CompactionJob {
public:
CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options,
const EnvOptions& env_options, VersionSet* versions,
std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats,
std::vector<SequenceNumber> existing_snapshots,
std::shared_ptr<Cache> table_cache,
std::function<uint64_t()> yield_callback,
EventLogger* event_logger, bool paranoid_file_checks,
const std::string& dbname,
CompactionJobStats* compaction_job_stats);
Allow GetThreadList() to report operation stage. Summary: Allow GetThreadList() to report operation stage. Test Plan: ./thread_list_test ./db_bench --benchmarks=fillrandom --num=100000 --threads=40 \ --max_background_compactions=10 --max_background_flushes=3 \ --thread_status_per_interval=1000 --key_size=16 --value_size=1000 \ --num_column_families=10 export ROCKSDB_TESTS=ThreadStatus ./db_test Sample output ThreadID ThreadType cfName Operation OP_StartTime ElapsedTime Stage State 140116265861184 Low Pri 140116270055488 Low Pri 140116274249792 High Pri column_family_name_000005 Flush 2015/03/10-14:58:11 0 us FlushJob::WriteLevel0Table 140116400078912 Low Pri column_family_name_000004 Compaction 2015/03/10-14:58:11 0 us CompactionJob::FinishCompactionOutputFile 140116358135872 Low Pri column_family_name_000006 Compaction 2015/03/10-14:58:10 1 us CompactionJob::FinishCompactionOutputFile 140116341358656 Low Pri 140116295221312 High Pri default Flush 2015/03/10-14:58:11 0 us FlushJob::WriteLevel0Table 140116324581440 Low Pri column_family_name_000009 Compaction 2015/03/10-14:58:11 0 us CompactionJob::ProcessKeyValueCompaction 140116278444096 Low Pri 140116299415616 Low Pri column_family_name_000008 Compaction 2015/03/10-14:58:11 0 us CompactionJob::FinishCompactionOutputFile 140116291027008 High Pri column_family_name_000001 Flush 2015/03/10-14:58:11 0 us FlushJob::WriteLevel0Table 140116286832704 Low Pri column_family_name_000002 Compaction 2015/03/10-14:58:11 0 us CompactionJob::FinishCompactionOutputFile 140116282638400 Low Pri Reviewers: rven, igor, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D34683
10 years ago
~CompactionJob();
// no copy/move
CompactionJob(CompactionJob&& job) = delete;
CompactionJob(const CompactionJob& job) = delete;
CompactionJob& operator=(const CompactionJob& job) = delete;
// REQUIRED: mutex held
void Prepare();
// REQUIRED mutex not held
Status Run();
// REQUIRED: mutex held
// status is the return of Run()
void Install(Status* status, const MutableCFOptions& mutable_cf_options,
InstrumentedMutex* db_mutex);
private:
Allow GetThreadList() to report basic compaction operation properties. Summary: Now we're able to show more details about a compaction in GetThreadList() :) This patch allows GetThreadList() to report basic compaction operation properties. Basic compaction properties include: 1. job id 2. compaction input / output level 3. compaction property flags (is_manual, is_deletion, .. etc) 4. total input bytes 5. the number of bytes has been read currently. 6. the number of bytes has been written currently. Flush operation properties will be done in a seperate diff. Test Plan: /db_bench --threads=30 --num=1000000 --benchmarks=fillrandom --thread_status_per_interval=1 Sample output of tracking same job: ThreadID ThreadType cfName Operation ElapsedTime Stage State OperationProperties 140664171987072 Low Pri default Compaction 31.357 ms CompactionJob::FinishCompactionOutputFile BaseInputLevel 1 | BytesRead 2264663 | BytesWritten 1934241 | IsDeletion 0 | IsManual 0 | IsTrivialMove 0 | JobID 277 | OutputLevel 2 | TotalInputBytes 3964158 | ThreadID ThreadType cfName Operation ElapsedTime Stage State OperationProperties 140664171987072 Low Pri default Compaction 59.440 ms CompactionJob::FinishCompactionOutputFile BaseInputLevel 1 | BytesRead 2264663 | BytesWritten 1934241 | IsDeletion 0 | IsManual 0 | IsTrivialMove 0 | JobID 277 | OutputLevel 2 | TotalInputBytes 3964158 | ThreadID ThreadType cfName Operation ElapsedTime Stage State OperationProperties 140664171987072 Low Pri default Compaction 226.375 ms CompactionJob::Install BaseInputLevel 1 | BytesRead 3958013 | BytesWritten 3621940 | IsDeletion 0 | IsManual 0 | IsTrivialMove 0 | JobID 277 | OutputLevel 2 | TotalInputBytes 3964158 | Reviewers: sdong, rven, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D37653
10 years ago
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);
void AllocateCompactionOutputFileNumbers();
// Processes batches of keys with the same prefixes. This is used for
// CompactionFilterV2.
Status ProcessPrefixBatches(ColumnFamilyData* cfd,
int64_t* imm_micros,
Iterator* input,
CompactionFilterV2* compaction_filter_v2);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
bool is_compaction_v2);
Status WriteKeyValue(const Slice& key, const Slice& value,
const ParsedInternalKey& ikey,
const Status& input_status);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2,
uint64_t* time);
Status FinishCompactionOutputFile(const Status& input_status);
Status InstallCompactionResults(InstrumentedMutex* db_mutex,
const MutableCFOptions& mutable_cf_options);
SequenceNumber findEarliestVisibleSnapshot(
SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
void RecordCompactionIOStats();
Status OpenCompactionOutputFile();
void CleanupCompaction(const Status& status);
void UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const;
void RecordDroppedKeys(int64_t* key_drop_user,
int64_t* key_drop_newer_entry,
int64_t* key_drop_obsolete);
void UpdateCompactionStats();
void UpdateCompactionInputStatsHelper(
int* num_files, uint64_t* bytes_read, int input_level);
void LogCompaction(ColumnFamilyData* cfd, Compaction* compaction);
int job_id_;
// CompactionJob state
struct CompactionState;
CompactionState* compact_;
CompactionJobStats* compaction_job_stats_;
bool bottommost_level_;
SequenceNumber earliest_snapshot_;
SequenceNumber visible_at_tip_;
SequenceNumber latest_snapshot_;
InternalStats::CompactionStats compaction_stats_;
// DBImpl state
const std::string& dbname_;
const DBOptions& db_options_;
const EnvOptions& env_options_;
Env* env_;
VersionSet* versions_;
std::atomic<bool>* shutting_down_;
LogBuffer* log_buffer_;
Directory* db_directory_;
Directory* output_directory_;
Statistics* stats_;
// If there were two snapshots with seq numbers s1 and
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
// entirely within s1 and s2, then the earlier version of k1 can be safely
// deleted because that version is not visible in any snapshot.
std::vector<SequenceNumber> existing_snapshots_;
std::shared_ptr<Cache> table_cache_;
// yield callback
std::function<uint64_t()> yield_callback_;
EventLogger* event_logger_;
bool paranoid_file_checks_;
};
} // namespace rocksdb