Optimizer's skeleton: use advisor to optimize config options (#4169)
Summary: In https://github.com/facebook/rocksdb/pull/3934 we introduced advisor scripts that make suggestions in the config options based on the log file and stats from a run of rocksdb. The optimizer runs the advisor on a benchmark application in a loop and automatically applies the suggested changes until the config options are optimized. This is a work in progress and the patch is the initial skeleton for the optimizer. The sample application that is run in the loop is currently dbbench. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4169 Reviewed By: maysamyabandeh Differential Revision: D9023671 Pulled By: poojam23 fbshipit-source-id: a6192d475c462cf6eb2b316716f97cb400fcb64dmain
parent
bdc6abd0b4
commit
134a52e144
@ -0,0 +1,39 @@ |
||||
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
||||
# This source code is licensed under both the GPLv2 (found in the |
||||
# COPYING file in the root directory) and Apache 2.0 License |
||||
# (found in the LICENSE.Apache file in the root directory). |
||||
|
||||
from abc import ABC, abstractmethod |
||||
import re |
||||
|
||||
|
||||
class BenchmarkRunner(ABC): |
||||
@staticmethod |
||||
@abstractmethod |
||||
def is_metric_better(new_metric, old_metric): |
||||
pass |
||||
|
||||
@abstractmethod |
||||
def run_experiment(self): |
||||
# should return a list of DataSource objects |
||||
pass |
||||
|
||||
@staticmethod |
||||
def get_info_log_file_name(log_dir, db_path): |
||||
# Example: DB Path = /dev/shm and OPTIONS file has option |
||||
# db_log_dir=/tmp/rocks/, then the name of the log file will be |
||||
# 'dev_shm_LOG' and its location will be /tmp/rocks. If db_log_dir is |
||||
# not specified in the OPTIONS file, then the location of the log file |
||||
# will be /dev/shm and the name of the file will be 'LOG' |
||||
file_name = '' |
||||
if log_dir: |
||||
# refer GetInfoLogPrefix() in rocksdb/util/filename.cc |
||||
# example db_path: /dev/shm/dbbench |
||||
file_name = db_path[1:] # to ignore the leading '/' character |
||||
to_be_replaced = re.compile('[^0-9a-zA-Z\-_\.]') |
||||
for character in to_be_replaced.findall(db_path): |
||||
file_name = file_name.replace(character, '_') |
||||
if not file_name.endswith('_'): |
||||
file_name += '_' |
||||
file_name += 'LOG' |
||||
return file_name |
@ -0,0 +1,134 @@ |
||||
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
||||
# This source code is licensed under both the GPLv2 (found in the |
||||
# COPYING file in the root directory) and Apache 2.0 License |
||||
# (found in the LICENSE.Apache file in the root directory). |
||||
|
||||
import argparse |
||||
from advisor.db_config_optimizer import ConfigOptimizer |
||||
from advisor.db_log_parser import NO_COL_FAMILY |
||||
from advisor.db_options_parser import DatabaseOptions |
||||
from advisor.rule_parser import RulesSpec |
||||
|
||||
|
||||
CONFIG_OPT_NUM_ITER = 10 |
||||
|
||||
|
||||
def main(args): |
||||
# initialise the RulesSpec parser |
||||
rule_spec_parser = RulesSpec(args.rules_spec) |
||||
# initialise the benchmark runner |
||||
bench_runner_module = __import__( |
||||
args.benchrunner_module, fromlist=[args.benchrunner_class] |
||||
) |
||||
bench_runner_class = getattr(bench_runner_module, args.benchrunner_class) |
||||
ods_args = {} |
||||
if args.ods_client and args.ods_entity: |
||||
ods_args['client_script'] = args.ods_client |
||||
ods_args['entity'] = args.ods_entity |
||||
if args.ods_key_prefix: |
||||
ods_args['key_prefix'] = args.ods_key_prefix |
||||
db_bench_runner = bench_runner_class(args.benchrunner_pos_args, ods_args) |
||||
# initialise the database configuration |
||||
db_options = DatabaseOptions(args.rocksdb_options, args.misc_options) |
||||
# set the frequency at which stats are dumped in the LOG file and the |
||||
# location of the LOG file. |
||||
db_log_dump_settings = { |
||||
"DBOptions.stats_dump_period_sec": { |
||||
NO_COL_FAMILY: args.stats_dump_period_sec |
||||
} |
||||
} |
||||
db_options.update_options(db_log_dump_settings) |
||||
# initialise the configuration optimizer |
||||
config_optimizer = ConfigOptimizer( |
||||
db_bench_runner, |
||||
db_options, |
||||
rule_spec_parser, |
||||
args.base_db_path |
||||
) |
||||
# run the optimiser to improve the database configuration for given |
||||
# benchmarks, with the help of expert-specified rules |
||||
final_db_options = config_optimizer.run() |
||||
# generate the final rocksdb options file |
||||
print( |
||||
'Final configuration in: ' + |
||||
final_db_options.generate_options_config('final') |
||||
) |
||||
print( |
||||
'Final miscellaneous options: ' + |
||||
repr(final_db_options.get_misc_options()) |
||||
) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
''' |
||||
An example run of this tool from the command-line would look like: |
||||
python3 -m advisor.config_optimizer_example |
||||
--base_db_path=/tmp/rocksdbtest-155919/dbbench |
||||
--rocksdb_options=temp/OPTIONS_boot.tmp --misc_options bloom_bits=2 |
||||
--rules_spec=advisor/rules.ini --stats_dump_period_sec=20 |
||||
--benchrunner_module=advisor.db_bench_runner |
||||
--benchrunner_class=DBBenchRunner --benchrunner_pos_args ./../../db_bench |
||||
readwhilewriting use_existing_db=true duration=90 |
||||
''' |
||||
parser = argparse.ArgumentParser(description='This script is used for\ |
||||
searching for a better database configuration') |
||||
parser.add_argument( |
||||
'--rocksdb_options', required=True, type=str, |
||||
help='path of the starting Rocksdb OPTIONS file' |
||||
) |
||||
# these are options that are column-family agnostic and are not yet |
||||
# supported by the Rocksdb Options file: eg. bloom_bits=2 |
||||
parser.add_argument( |
||||
'--base_db_path', required=True, type=str, |
||||
help='path for the Rocksdb database' |
||||
) |
||||
parser.add_argument( |
||||
'--misc_options', nargs='*', |
||||
help='whitespace-separated list of options that are not supported ' + |
||||
'by the Rocksdb OPTIONS file, given in the ' + |
||||
'<option_name>=<option_value> format eg. "bloom_bits=2 ' + |
||||
'rate_limiter_bytes_per_sec=128000000"') |
||||
parser.add_argument( |
||||
'--rules_spec', required=True, type=str, |
||||
help='path of the file containing the expert-specified Rules' |
||||
) |
||||
parser.add_argument( |
||||
'--stats_dump_period_sec', required=True, type=int, |
||||
help='the frequency (in seconds) at which STATISTICS are printed to ' + |
||||
'the Rocksdb LOG file' |
||||
) |
||||
# ODS arguments |
||||
parser.add_argument( |
||||
'--ods_client', type=str, help='the ODS client binary' |
||||
) |
||||
parser.add_argument( |
||||
'--ods_entity', type=str, |
||||
help='the servers for which the ODS stats need to be fetched' |
||||
) |
||||
parser.add_argument( |
||||
'--ods_key_prefix', type=str, |
||||
help='the prefix that needs to be attached to the keys of time ' + |
||||
'series to be fetched from ODS' |
||||
) |
||||
# benchrunner_module example: advisor.db_benchmark_client |
||||
parser.add_argument( |
||||
'--benchrunner_module', required=True, type=str, |
||||
help='the module containing the BenchmarkRunner class to be used by ' + |
||||
'the Optimizer, example: advisor.db_bench_runner' |
||||
) |
||||
# benchrunner_class example: DBBenchRunner |
||||
parser.add_argument( |
||||
'--benchrunner_class', required=True, type=str, |
||||
help='the name of the BenchmarkRunner class to be used by the ' + |
||||
'Optimizer, should be present in the module provided in the ' + |
||||
'benchrunner_module argument, example: DBBenchRunner' |
||||
) |
||||
parser.add_argument( |
||||
'--benchrunner_pos_args', nargs='*', |
||||
help='whitespace-separated positional arguments that are passed on ' + |
||||
'to the constructor of the BenchmarkRunner class provided in the ' + |
||||
'benchrunner_class argument, example: "use_existing_db=true ' + |
||||
'duration=900"' |
||||
) |
||||
args = parser.parse_args() |
||||
main(args) |
@ -0,0 +1,312 @@ |
||||
from advisor.bench_runner import BenchmarkRunner |
||||
from advisor.db_log_parser import DataSource, DatabaseLogs, NO_COL_FAMILY |
||||
from advisor.db_options_parser import DatabaseOptions |
||||
from advisor.db_stats_fetcher import ( |
||||
LogStatsParser, OdsStatsFetcher, DatabasePerfContext |
||||
) |
||||
import os |
||||
import re |
||||
import shutil |
||||
import subprocess |
||||
import time |
||||
|
||||
|
||||
''' |
||||
NOTE: This is not thread-safe, because the output file is simply overwritten. |
||||
''' |
||||
|
||||
|
||||
class DBBenchRunner(BenchmarkRunner): |
||||
OUTPUT_FILE = "temp/dbbench_out.tmp" |
||||
ERROR_FILE = "temp/dbbench_err.tmp" |
||||
DB_PATH = "DB path" |
||||
THROUGHPUT = "ops/sec" |
||||
PERF_CON = " PERF_CONTEXT:" |
||||
|
||||
@staticmethod |
||||
def is_metric_better(new_metric, old_metric): |
||||
# for db_bench 'throughput' is the metric returned by run_experiment |
||||
return new_metric >= old_metric |
||||
|
||||
@staticmethod |
||||
def get_opt_args_str(misc_options_dict): |
||||
optional_args_str = "" |
||||
for option_name, option_value in misc_options_dict.items(): |
||||
if option_value: |
||||
optional_args_str += ( |
||||
" --" + option_name + "=" + str(option_value) |
||||
) |
||||
return optional_args_str |
||||
|
||||
def __init__(self, positional_args, ods_args=None): |
||||
# parse positional_args list appropriately |
||||
self.db_bench_binary = positional_args[0] |
||||
self.benchmark = positional_args[1] |
||||
self.db_bench_args = None |
||||
# TODO(poojam23): move to unittest with method get_available_workloads |
||||
self.supported_benchmarks = None |
||||
if len(positional_args) > 2: |
||||
# options list with each option given as "<option>=<value>" |
||||
self.db_bench_args = positional_args[2:] |
||||
# save ods_args if provided |
||||
self.ods_args = ods_args |
||||
|
||||
def _parse_output(self, get_perf_context=False): |
||||
''' |
||||
Sample db_bench output after running 'readwhilewriting' benchmark: |
||||
DB path: [/tmp/rocksdbtest-155919/dbbench]\n |
||||
readwhilewriting : 16.582 micros/op 60305 ops/sec; 4.2 MB/s (3433828\ |
||||
of 5427999 found)\n |
||||
PERF_CONTEXT:\n |
||||
user_key_comparison_count = 500466712, block_cache_hit_count = ...\n |
||||
''' |
||||
output = { |
||||
self.THROUGHPUT: None, self.DB_PATH: None, self.PERF_CON: None |
||||
} |
||||
perf_context_begins = False |
||||
with open(self.OUTPUT_FILE, 'r') as fp: |
||||
for line in fp: |
||||
if line.startswith(self.benchmark): |
||||
print(line) # print output of db_bench run |
||||
token_list = line.strip().split() |
||||
for ix, token in enumerate(token_list): |
||||
if token.startswith(self.THROUGHPUT): |
||||
output[self.THROUGHPUT] = ( |
||||
float(token_list[ix - 1]) |
||||
) |
||||
break |
||||
elif line.startswith(self.PERF_CON): |
||||
perf_context_begins = True |
||||
elif get_perf_context and perf_context_begins: |
||||
# Sample perf_context output: |
||||
# user_key_comparison_count = 500, block_cache_hit_count =\ |
||||
# 468, block_read_count = 580, block_read_byte = 445, ... |
||||
token_list = line.strip().split(',') |
||||
perf_context = { |
||||
tk.split('=')[0].strip(): tk.split('=')[1].strip() |
||||
for tk in token_list |
||||
if tk |
||||
} |
||||
# TODO(poojam23): this is a hack and should be replaced |
||||
# with the timestamp that db_bench will provide per printed |
||||
# perf_context |
||||
timestamp = int(time.time()) |
||||
perf_context_ts = {} |
||||
for stat in perf_context.keys(): |
||||
perf_context_ts[stat] = { |
||||
timestamp: int(perf_context[stat]) |
||||
} |
||||
output[self.PERF_CON] = perf_context_ts |
||||
perf_context_begins = False |
||||
elif line.startswith(self.DB_PATH): |
||||
output[self.DB_PATH] = ( |
||||
line.split('[')[1].split(']')[0] |
||||
) |
||||
return output |
||||
|
||||
def get_log_options(self, db_options, db_path): |
||||
# get the location of the LOG file and the frequency at which stats are |
||||
# dumped in the LOG file |
||||
log_dir_path = None |
||||
stats_freq_sec = None |
||||
logs_file_prefix = None |
||||
|
||||
# fetch the options |
||||
dump_period = 'DBOptions.stats_dump_period_sec' |
||||
log_dir = 'DBOptions.db_log_dir' |
||||
log_options = db_options.get_options([dump_period, log_dir]) |
||||
if dump_period in log_options: |
||||
stats_freq_sec = int(log_options[dump_period][NO_COL_FAMILY]) |
||||
if log_dir in log_options: |
||||
log_dir_path = log_options[log_dir][NO_COL_FAMILY] |
||||
|
||||
log_file_name = DBBenchRunner.get_info_log_file_name( |
||||
log_dir_path, db_path |
||||
) |
||||
|
||||
if not log_dir_path: |
||||
log_dir_path = db_path |
||||
if not log_dir_path.endswith('/'): |
||||
log_dir_path += '/' |
||||
|
||||
logs_file_prefix = log_dir_path + log_file_name |
||||
return (logs_file_prefix, stats_freq_sec) |
||||
|
||||
def _get_options_command_line_args_str(self, curr_options): |
||||
''' |
||||
This method uses the provided Rocksdb OPTIONS to create a string of |
||||
command-line arguments for db_bench. |
||||
The --options_file argument is always given and the options that are |
||||
not supported by the OPTIONS file are given as separate arguments. |
||||
''' |
||||
optional_args_str = DBBenchRunner.get_opt_args_str( |
||||
curr_options.get_misc_options() |
||||
) |
||||
# generate an options configuration file |
||||
options_file = curr_options.generate_options_config(nonce='12345') |
||||
optional_args_str += " --options_file=" + options_file |
||||
return optional_args_str |
||||
|
||||
def _setup_db_before_experiment(self, curr_options, db_path): |
||||
# remove destination directory if it already exists |
||||
try: |
||||
shutil.rmtree(db_path, ignore_errors=True) |
||||
except OSError as e: |
||||
print('Error: rmdir ' + e.filename + ' ' + e.strerror) |
||||
command = "%s --benchmarks=fillrandom --db=%s --num=1000000" % ( |
||||
self.db_bench_binary, db_path |
||||
) |
||||
args_str = self._get_options_command_line_args_str(curr_options) |
||||
command += args_str |
||||
self._run_command(command) |
||||
|
||||
def _build_experiment_command(self, curr_options, db_path): |
||||
command = "%s --benchmarks=%s --statistics --perf_level=3 --db=%s" % ( |
||||
self.db_bench_binary, self.benchmark, db_path |
||||
) |
||||
args_str = self._get_options_command_line_args_str(curr_options) |
||||
# handle the command-line args passed in the constructor |
||||
for cmd_line_arg in self.db_bench_args: |
||||
args_str += (" --" + cmd_line_arg) |
||||
command += args_str |
||||
return command |
||||
|
||||
def _run_command(self, command): |
||||
# run db_bench and return the |
||||
out_file = open(self.OUTPUT_FILE, "w+") |
||||
err_file = open(self.ERROR_FILE, "w+") |
||||
print('executing... - ' + command) |
||||
subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) |
||||
out_file.close() |
||||
err_file.close() |
||||
|
||||
def run_experiment(self, db_options, db_path): |
||||
# type: (List[str], str) -> str |
||||
self._setup_db_before_experiment(db_options, db_path) |
||||
command = self._build_experiment_command(db_options, db_path) |
||||
self._run_command(command) |
||||
|
||||
parsed_output = self._parse_output(get_perf_context=True) |
||||
|
||||
# Create the LOGS object |
||||
# get the log options from the OPTIONS file |
||||
logs_file_prefix, stats_freq_sec = self.get_log_options( |
||||
db_options, parsed_output[self.DB_PATH] |
||||
) |
||||
db_logs = DatabaseLogs( |
||||
logs_file_prefix, db_options.get_column_families() |
||||
) |
||||
# Create the Log STATS object |
||||
db_log_stats = LogStatsParser(logs_file_prefix, stats_freq_sec) |
||||
# Create the PerfContext STATS object |
||||
db_perf_context = DatabasePerfContext( |
||||
parsed_output[self.PERF_CON], 0, False |
||||
) |
||||
data_sources = { |
||||
DataSource.Type.DB_OPTIONS: [db_options], |
||||
DataSource.Type.LOG: [db_logs], |
||||
DataSource.Type.TIME_SERIES: [db_log_stats, db_perf_context] |
||||
} |
||||
# Create the ODS STATS object |
||||
if self.ods_args: |
||||
data_sources[DataSource.Type.TIME_SERIES].append(OdsStatsFetcher( |
||||
self.ods_args['client_script'], |
||||
self.ods_args['entity'], |
||||
self.ods_args['key_prefix'] |
||||
)) |
||||
return data_sources, parsed_output[self.THROUGHPUT] |
||||
|
||||
# TODO: this method is for testing, shift it out to unit-tests when ready |
||||
def get_available_workloads(self): |
||||
if not self.supported_benchmarks: |
||||
self.supported_benchmarks = [] |
||||
command = '%s --help' % self.db_bench_binary |
||||
self._run_command(command) |
||||
with open(self.OUTPUT_FILE, 'r') as fp: |
||||
start = False |
||||
for line in fp: |
||||
if re.search('available benchmarks', line, re.IGNORECASE): |
||||
start = True |
||||
continue |
||||
elif start: |
||||
if re.search('meta operations', line, re.IGNORECASE): |
||||
break |
||||
benchmark_info = line.strip() |
||||
if benchmark_info: |
||||
token_list = benchmark_info.split() |
||||
if len(token_list) > 2 and token_list[1] == '--': |
||||
self.supported_benchmarks.append(token_list[0]) |
||||
else: |
||||
continue |
||||
self.supported_benchmarks = sorted(self.supported_benchmarks) |
||||
return self.supported_benchmarks |
||||
|
||||
|
||||
# TODO: remove this method, used only for testing |
||||
def main(): |
||||
pos_args = [ |
||||
'/home/poojamalik/workspace/rocksdb/db_bench', |
||||
'readwhilewriting', |
||||
'use_existing_db=true', |
||||
'duration=10' |
||||
] |
||||
db_bench_helper = DBBenchRunner(pos_args) |
||||
# populate benchmarks with the available ones in the db_bench tool |
||||
benchmarks = db_bench_helper.get_available_workloads() |
||||
print(benchmarks) |
||||
print() |
||||
options_file = ( |
||||
'/home/poojamalik/workspace/rocksdb/tools/advisor/temp/' + |
||||
'OPTIONS_temp.tmp' |
||||
) |
||||
misc_options = ["rate_limiter_bytes_per_sec=1024000", "bloom_bits=2"] |
||||
db_options = DatabaseOptions(options_file, misc_options) |
||||
data_sources, _ = db_bench_helper.run_experiment(db_options) |
||||
print(data_sources[DataSource.Type.DB_OPTIONS][0].options_dict) |
||||
print() |
||||
print(data_sources[DataSource.Type.LOG][0].logs_path_prefix) |
||||
if os.path.isfile(data_sources[DataSource.Type.LOG][0].logs_path_prefix): |
||||
print('log file exists!') |
||||
else: |
||||
print('error: log file does not exist!') |
||||
print(data_sources[DataSource.Type.LOG][0].column_families) |
||||
print() |
||||
print(data_sources[DataSource.Type.TIME_SERIES][0].logs_file_prefix) |
||||
if ( |
||||
os.path.isfile( |
||||
data_sources[DataSource.Type.TIME_SERIES][0].logs_file_prefix |
||||
) |
||||
): |
||||
print('log file exists!') |
||||
else: |
||||
print('error: log file does not exist!') |
||||
print(data_sources[DataSource.Type.TIME_SERIES][0].stats_freq_sec) |
||||
print(data_sources[DataSource.Type.TIME_SERIES][1].keys_ts) |
||||
|
||||
db_options = DatabaseOptions(options_file, None) |
||||
data_sources, _ = db_bench_helper.run_experiment(db_options) |
||||
print(data_sources[DataSource.Type.DB_OPTIONS][0].options_dict) |
||||
print() |
||||
print(data_sources[DataSource.Type.LOG][0].logs_path_prefix) |
||||
if os.path.isfile(data_sources[DataSource.Type.LOG][0].logs_path_prefix): |
||||
print('log file exists!') |
||||
else: |
||||
print('error: log file does not exist!') |
||||
print(data_sources[DataSource.Type.LOG][0].column_families) |
||||
print() |
||||
print(data_sources[DataSource.Type.TIME_SERIES][0].logs_file_prefix) |
||||
if ( |
||||
os.path.isfile( |
||||
data_sources[DataSource.Type.TIME_SERIES][0].logs_file_prefix |
||||
) |
||||
): |
||||
print('log file exists!') |
||||
else: |
||||
print('error: log file does not exist!') |
||||
print(data_sources[DataSource.Type.TIME_SERIES][0].stats_freq_sec) |
||||
print(data_sources[DataSource.Type.TIME_SERIES][1].keys_ts) |
||||
print(data_sources[DataSource.Type.TIME_SERIES][1].stats_freq_sec) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
@ -0,0 +1,282 @@ |
||||
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
||||
# This source code is licensed under both the GPLv2 (found in the |
||||
# COPYING file in the root directory) and Apache 2.0 License |
||||
# (found in the LICENSE.Apache file in the root directory). |
||||
|
||||
from advisor.db_log_parser import NO_COL_FAMILY |
||||
from advisor.db_options_parser import DatabaseOptions |
||||
from advisor.rule_parser import Suggestion |
||||
import copy |
||||
import random |
||||
|
||||
|
||||
class ConfigOptimizer: |
||||
SCOPE = 'scope' |
||||
SUGG_VAL = 'suggested values' |
||||
|
||||
@staticmethod |
||||
def apply_action_on_value(old_value, action, suggested_values): |
||||
chosen_sugg_val = None |
||||
if suggested_values: |
||||
chosen_sugg_val = random.choice(list(suggested_values)) |
||||
new_value = None |
||||
if action is Suggestion.Action.set or not old_value: |
||||
assert(chosen_sugg_val) |
||||
new_value = chosen_sugg_val |
||||
else: |
||||
# For increase/decrease actions, currently the code tries to make |
||||
# a 30% change in the option's value per iteration. An addend is |
||||
# also present (+1 or -1) to handle the cases when the option's |
||||
# old value was 0 or the final int() conversion suppressed the 30% |
||||
# change made to the option |
||||
old_value = float(old_value) |
||||
mul = 0 |
||||
add = 0 |
||||
if action is Suggestion.Action.increase: |
||||
if old_value < 0: |
||||
mul = 0.7 |
||||
add = 2 |
||||
else: |
||||
mul = 1.3 |
||||
add = 2 |
||||
elif action is Suggestion.Action.decrease: |
||||
if old_value < 0: |
||||
mul = 1.3 |
||||
add = -2 |
||||
else: |
||||
mul = 0.7 |
||||
add = -2 |
||||
new_value = int(old_value * mul + add) |
||||
return new_value |
||||
|
||||
@staticmethod |
||||
def improve_db_config(options, rule, suggestions_dict): |
||||
# this method takes ONE 'rule' and applies all its suggestions on the |
||||
# appropriate options |
||||
required_options = [] |
||||
rule_suggestions = [] |
||||
for sugg_name in rule.get_suggestions(): |
||||
option = suggestions_dict[sugg_name].option |
||||
action = suggestions_dict[sugg_name].action |
||||
# A Suggestion in the rules spec must have the 'option' and |
||||
# 'action' fields defined, always call perform_checks() method |
||||
# after parsing the rules file using RulesSpec |
||||
assert(option) |
||||
assert(action) |
||||
required_options.append(option) |
||||
rule_suggestions.append(suggestions_dict[sugg_name]) |
||||
current_config = options.get_options(required_options) |
||||
# Create the updated configuration from the rule's suggestions |
||||
updated_config = {} |
||||
for sugg in rule_suggestions: |
||||
# case: when the option is not present in the current configuration |
||||
if sugg.option not in current_config: |
||||
try: |
||||
new_value = ConfigOptimizer.apply_action_on_value( |
||||
None, sugg.action, sugg.suggested_values |
||||
) |
||||
if sugg.option not in updated_config: |
||||
updated_config[sugg.option] = {} |
||||
if DatabaseOptions.is_misc_option(sugg.option): |
||||
# this suggestion is on an option that is not yet |
||||
# supported by the Rocksdb OPTIONS file and so it is |
||||
# not prefixed by a section type. |
||||
updated_config[sugg.option][NO_COL_FAMILY] = new_value |
||||
else: |
||||
for col_fam in rule.get_trigger_column_families(): |
||||
updated_config[sugg.option][col_fam] = new_value |
||||
except AssertionError: |
||||
print( |
||||
'WARNING(ConfigOptimizer): provide suggested_values ' + |
||||
'for ' + sugg.option |
||||
) |
||||
continue |
||||
# case: when the option is present in the current configuration |
||||
if NO_COL_FAMILY in current_config[sugg.option]: |
||||
old_value = current_config[sugg.option][NO_COL_FAMILY] |
||||
try: |
||||
new_value = ConfigOptimizer.apply_action_on_value( |
||||
old_value, sugg.action, sugg.suggested_values |
||||
) |
||||
if sugg.option not in updated_config: |
||||
updated_config[sugg.option] = {} |
||||
updated_config[sugg.option][NO_COL_FAMILY] = new_value |
||||
except AssertionError: |
||||
print( |
||||
'WARNING(ConfigOptimizer): provide suggested_values ' + |
||||
'for ' + sugg.option |
||||
) |
||||
else: |
||||
for col_fam in rule.get_trigger_column_families(): |
||||
old_value = None |
||||
if col_fam in current_config[sugg.option]: |
||||
old_value = current_config[sugg.option][col_fam] |
||||
try: |
||||
new_value = ConfigOptimizer.apply_action_on_value( |
||||
old_value, sugg.action, sugg.suggested_values |
||||
) |
||||
if sugg.option not in updated_config: |
||||
updated_config[sugg.option] = {} |
||||
updated_config[sugg.option][col_fam] = new_value |
||||
except AssertionError: |
||||
print( |
||||
'WARNING(ConfigOptimizer): provide ' + |
||||
'suggested_values for ' + sugg.option |
||||
) |
||||
return current_config, updated_config |
||||
|
||||
@staticmethod |
||||
def pick_rule_to_apply(rules, last_rule_name, rules_tried, backtrack): |
||||
if not rules: |
||||
print('\nNo more rules triggered!') |
||||
return None |
||||
# if the last rule provided an improvement in the database performance, |
||||
# and it was triggered again (i.e. it is present in 'rules'), then pick |
||||
# the same rule for this iteration too. |
||||
if last_rule_name and not backtrack: |
||||
for rule in rules: |
||||
if rule.name == last_rule_name: |
||||
return rule |
||||
# there was no previous rule OR the previous rule did not improve db |
||||
# performance OR it was not triggered for this iteration, |
||||
# then pick another rule that has not been tried yet |
||||
for rule in rules: |
||||
if rule.name not in rules_tried: |
||||
return rule |
||||
print('\nAll rules have been exhausted') |
||||
return None |
||||
|
||||
@staticmethod |
||||
def apply_suggestions( |
||||
triggered_rules, |
||||
current_rule_name, |
||||
rules_tried, |
||||
backtrack, |
||||
curr_options, |
||||
suggestions_dict |
||||
): |
||||
curr_rule = ConfigOptimizer.pick_rule_to_apply( |
||||
triggered_rules, current_rule_name, rules_tried, backtrack |
||||
) |
||||
if not curr_rule: |
||||
return tuple([None]*4) |
||||
# if a rule has been picked for improving db_config, update rules_tried |
||||
rules_tried.add(curr_rule.name) |
||||
# get updated config based on the picked rule |
||||
curr_conf, updated_conf = ConfigOptimizer.improve_db_config( |
||||
curr_options, curr_rule, suggestions_dict |
||||
) |
||||
conf_diff = DatabaseOptions.get_options_diff(curr_conf, updated_conf) |
||||
if not conf_diff: # the current and updated configs are the same |
||||
curr_rule, rules_tried, curr_conf, updated_conf = ( |
||||
ConfigOptimizer.apply_suggestions( |
||||
triggered_rules, |
||||
None, |
||||
rules_tried, |
||||
backtrack, |
||||
curr_options, |
||||
suggestions_dict |
||||
) |
||||
) |
||||
print('returning from apply_suggestions') |
||||
return (curr_rule, rules_tried, curr_conf, updated_conf) |
||||
|
||||
# TODO(poojam23): check if this method is required or can we directly set |
||||
# the config equal to the curr_config |
||||
@staticmethod |
||||
def get_backtrack_config(curr_config, updated_config): |
||||
diff = DatabaseOptions.get_options_diff(curr_config, updated_config) |
||||
bt_config = {} |
||||
for option in diff: |
||||
bt_config[option] = {} |
||||
for col_fam in diff[option]: |
||||
bt_config[option][col_fam] = diff[option][col_fam][0] |
||||
print(bt_config) |
||||
return bt_config |
||||
|
||||
def __init__(self, bench_runner, db_options, rule_parser, base_db): |
||||
self.bench_runner = bench_runner |
||||
self.db_options = db_options |
||||
self.rule_parser = rule_parser |
||||
self.base_db_path = base_db |
||||
|
||||
def run(self): |
||||
# In every iteration of this method's optimization loop we pick ONE |
||||
# RULE from all the triggered rules and apply all its suggestions to |
||||
# the appropriate options. |
||||
# bootstrapping the optimizer |
||||
print('Bootstrapping optimizer:') |
||||
options = copy.deepcopy(self.db_options) |
||||
old_data_sources, old_metric = ( |
||||
self.bench_runner.run_experiment(options, self.base_db_path) |
||||
) |
||||
print('Initial metric: ' + str(old_metric)) |
||||
self.rule_parser.load_rules_from_spec() |
||||
self.rule_parser.perform_section_checks() |
||||
triggered_rules = self.rule_parser.get_triggered_rules( |
||||
old_data_sources, options.get_column_families() |
||||
) |
||||
print('\nTriggered:') |
||||
self.rule_parser.print_rules(triggered_rules) |
||||
backtrack = False |
||||
rules_tried = set() |
||||
curr_rule, rules_tried, curr_conf, updated_conf = ( |
||||
ConfigOptimizer.apply_suggestions( |
||||
triggered_rules, |
||||
None, |
||||
rules_tried, |
||||
backtrack, |
||||
options, |
||||
self.rule_parser.get_suggestions_dict() |
||||
) |
||||
) |
||||
# the optimizer loop |
||||
while curr_rule: |
||||
print('\nRule picked for next iteration:') |
||||
print(curr_rule.name) |
||||
print('\ncurrent config:') |
||||
print(curr_conf) |
||||
print('updated config:') |
||||
print(updated_conf) |
||||
options.update_options(updated_conf) |
||||
# run bench_runner with updated config |
||||
new_data_sources, new_metric = ( |
||||
self.bench_runner.run_experiment(options, self.base_db_path) |
||||
) |
||||
print('\nnew metric: ' + str(new_metric)) |
||||
backtrack = not self.bench_runner.is_metric_better( |
||||
new_metric, old_metric |
||||
) |
||||
# update triggered_rules, metric, data_sources, if required |
||||
if backtrack: |
||||
# revert changes to options config |
||||
print('\nBacktracking to previous configuration') |
||||
backtrack_conf = ConfigOptimizer.get_backtrack_config( |
||||
curr_conf, updated_conf |
||||
) |
||||
options.update_options(backtrack_conf) |
||||
else: |
||||
# run advisor on new data sources |
||||
self.rule_parser.load_rules_from_spec() # reboot the advisor |
||||
self.rule_parser.perform_section_checks() |
||||
triggered_rules = self.rule_parser.get_triggered_rules( |
||||
new_data_sources, options.get_column_families() |
||||
) |
||||
print('\nTriggered:') |
||||
self.rule_parser.print_rules(triggered_rules) |
||||
old_metric = new_metric |
||||
old_data_sources = new_data_sources |
||||
rules_tried = set() |
||||
# pick rule to work on and set curr_rule to that |
||||
curr_rule, rules_tried, curr_conf, updated_conf = ( |
||||
ConfigOptimizer.apply_suggestions( |
||||
triggered_rules, |
||||
curr_rule.name, |
||||
rules_tried, |
||||
backtrack, |
||||
options, |
||||
self.rule_parser.get_suggestions_dict() |
||||
) |
||||
) |
||||
# return the final database options configuration |
||||
return options |
@ -0,0 +1,421 @@ |
||||
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
||||
# This source code is licensed under both the GPLv2 (found in the |
||||
# COPYING file in the root directory) and Apache 2.0 License |
||||
# (found in the LICENSE.Apache file in the root directory). |
||||
|
||||
from advisor.db_log_parser import Log |
||||
from advisor.db_timeseries_parser import TimeSeriesData, NO_ENTITY |
||||
from advisor.rule_parser import Condition, TimeSeriesCondition |
||||
import copy |
||||
import glob |
||||
import re |
||||
import subprocess |
||||
import time |
||||
|
||||
|
||||
class LogStatsParser(TimeSeriesData): |
||||
STATS = 'STATISTICS:' |
||||
|
||||
@staticmethod |
||||
def parse_log_line_for_stats(log_line): |
||||
# Example stat line (from LOG file): |
||||
# "rocksdb.db.get.micros P50 : 8.4 P95 : 21.8 P99 : 33.9 P100 : 92.0\n" |
||||
token_list = log_line.strip().split() |
||||
# token_list = ['rocksdb.db.get.micros', 'P50', ':', '8.4', 'P95', ':', |
||||
# '21.8', 'P99', ':', '33.9', 'P100', ':', '92.0'] |
||||
stat_prefix = token_list[0] + '.' # 'rocksdb.db.get.micros.' |
||||
stat_values = [ |
||||
token |
||||
for token in token_list[1:] |
||||
if token != ':' |
||||
] |
||||
# stat_values = ['P50', '8.4', 'P95', '21.8', 'P99', '33.9', 'P100', |
||||
# '92.0'] |
||||
stat_dict = {} |
||||
for ix, metric in enumerate(stat_values): |
||||
if ix % 2 == 0: |
||||
stat_name = stat_prefix + metric |
||||
stat_name = stat_name.lower() # Note: case insensitive names |
||||
else: |
||||
stat_dict[stat_name] = float(metric) |
||||
# stat_dict = {'rocksdb.db.get.micros.p50': 8.4, |
||||
# 'rocksdb.db.get.micros.p95': 21.8, 'rocksdb.db.get.micros.p99': 33.9, |
||||
# 'rocksdb.db.get.micros.p100': 92.0} |
||||
return stat_dict |
||||
|
||||
def __init__(self, logs_path_prefix, stats_freq_sec): |
||||
super().__init__() |
||||
self.logs_file_prefix = logs_path_prefix |
||||
self.stats_freq_sec = stats_freq_sec |
||||
self.duration_sec = 60 |
||||
|
||||
def get_keys_from_conditions(self, conditions): |
||||
# Note: case insensitive stat names |
||||
reqd_stats = [] |
||||
for cond in conditions: |
||||
for key in cond.keys: |
||||
key = key.lower() |
||||
# some keys are prepended with '[]' for OdsStatsFetcher to |
||||
# replace this with the appropriate key_prefix, remove these |
||||
# characters here since the LogStatsParser does not need |
||||
# a prefix |
||||
if key.startswith('[]'): |
||||
reqd_stats.append(key[2:]) |
||||
else: |
||||
reqd_stats.append(key) |
||||
return reqd_stats |
||||
|
||||
def add_to_timeseries(self, log, reqd_stats): |
||||
# this method takes in the Log object that contains the Rocksdb stats |
||||
# and a list of required stats, then it parses the stats line by line |
||||
# to fetch required stats and add them to the keys_ts object |
||||
# Example: reqd_stats = ['rocksdb.block.cache.hit.count', |
||||
# 'rocksdb.db.get.micros.p99'] |
||||
# Let log.get_message() returns following string: |
||||
# "[WARN] [db/db_impl.cc:485] STATISTICS:\n |
||||
# rocksdb.block.cache.miss COUNT : 1459\n |
||||
# rocksdb.block.cache.hit COUNT : 37\n |
||||
# ... |
||||
# rocksdb.db.get.micros P50 : 15.6 P95 : 39.7 P99 : 62.6 P100 : 148.0\n |
||||
# ..." |
||||
new_lines = log.get_message().split('\n') |
||||
# let log_ts = 1532518219 |
||||
log_ts = log.get_timestamp() |
||||
# example updates to keys_ts: |
||||
# keys_ts[NO_ENTITY]['rocksdb.db.get.micros.p99'][1532518219] = 62.6 |
||||
# keys_ts[NO_ENTITY]['rocksdb.block.cache.hit.count'][1532518219] = 37 |
||||
for line in new_lines[1:]: # new_lines[0] does not contain any stats |
||||
stats_on_line = self.parse_log_line_for_stats(line) |
||||
for stat in stats_on_line: |
||||
if stat in reqd_stats: |
||||
if stat not in self.keys_ts[NO_ENTITY]: |
||||
self.keys_ts[NO_ENTITY][stat] = {} |
||||
self.keys_ts[NO_ENTITY][stat][log_ts] = stats_on_line[stat] |
||||
|
||||
def fetch_timeseries(self, reqd_stats): |
||||
# this method parses the Rocksdb LOG file and generates timeseries for |
||||
# each of the statistic in the list reqd_stats |
||||
self.keys_ts = {NO_ENTITY: {}} |
||||
for file_name in glob.glob(self.logs_file_prefix + '*'): |
||||
# TODO(poojam23): find a way to distinguish between 'old' log files |
||||
# from current and previous experiments, present in the same |
||||
# directory |
||||
if re.search('old', file_name, re.IGNORECASE): |
||||
continue |
||||
with open(file_name, 'r') as db_logs: |
||||
new_log = None |
||||
for line in db_logs: |
||||
if Log.is_new_log(line): |
||||
if ( |
||||
new_log and |
||||
re.search(self.STATS, new_log.get_message()) |
||||
): |
||||
self.add_to_timeseries(new_log, reqd_stats) |
||||
new_log = Log(line, column_families=[]) |
||||
else: |
||||
# To account for logs split into multiple lines |
||||
new_log.append_message(line) |
||||
# Check for the last log in the file. |
||||
if new_log and re.search(self.STATS, new_log.get_message()): |
||||
self.add_to_timeseries(new_log, reqd_stats) |
||||
|
||||
|
||||
class DatabasePerfContext(TimeSeriesData): |
||||
# TODO(poojam23): check if any benchrunner provides PerfContext sampled at |
||||
# regular intervals |
||||
def __init__(self, perf_context_ts, stats_freq_sec=0, cumulative=True): |
||||
''' |
||||
perf_context_ts is expected to be in the following format: |
||||
Dict[metric, Dict[timestamp, value]], where for |
||||
each (metric, timestamp) pair, the value is database-wide (i.e. |
||||
summed over all the threads involved) |
||||
if stats_freq_sec == 0, per-metric only one value is reported |
||||
''' |
||||
super().__init__() |
||||
self.stats_freq_sec = stats_freq_sec |
||||
self.keys_ts = {NO_ENTITY: perf_context_ts} |
||||
if cumulative: |
||||
self.unaccumulate_metrics() |
||||
|
||||
def unaccumulate_metrics(self): |
||||
# if the perf context metrics provided are cumulative in nature, this |
||||
# method can be used to convert them to a disjoint format |
||||
epoch_ts = copy.deepcopy(self.keys_ts) |
||||
for stat in self.keys_ts[NO_ENTITY]: |
||||
timeseries = sorted( |
||||
list(self.keys_ts[NO_ENTITY][stat].keys()), reverse=True |
||||
) |
||||
if len(timeseries) < 2: |
||||
continue |
||||
for ix, ts in enumerate(timeseries[:-1]): |
||||
epoch_ts[NO_ENTITY][stat][ts] = ( |
||||
epoch_ts[NO_ENTITY][stat][ts] - |
||||
epoch_ts[NO_ENTITY][stat][timeseries[ix+1]] |
||||
) |
||||
if epoch_ts[NO_ENTITY][stat][ts] < 0: |
||||
raise ValueError('DBPerfContext: really cumulative?') |
||||
# drop the smallest timestamp in the timeseries for this metric |
||||
epoch_ts[NO_ENTITY][stat].pop(timeseries[-1]) |
||||
self.keys_ts = epoch_ts |
||||
|
||||
def get_keys_from_conditions(self, conditions): |
||||
reqd_stats = [] |
||||
for cond in conditions: |
||||
reqd_stats.extend([key.lower() for key in cond.keys]) |
||||
return reqd_stats |
||||
|
||||
def fetch_timeseries(self, statistics): |
||||
# this method is redundant for DatabasePerfContext because the __init__ |
||||
# does the job of populating 'keys_ts' |
||||
pass |
||||
|
||||
|
||||
class OdsStatsFetcher(TimeSeriesData): |
||||
# class constants |
||||
OUTPUT_FILE = 'temp/stats_out.tmp' |
||||
ERROR_FILE = 'temp/stats_err.tmp' |
||||
RAPIDO_COMMAND = "%s --entity=%s --key=%s --tstart=%s --tend=%s --showtime" |
||||
ODS_COMMAND = '%s %s %s' # client, entities, keys |
||||
|
||||
# static methods |
||||
@staticmethod |
||||
def _get_string_in_quotes(value): |
||||
return '"' + str(value) + '"' |
||||
|
||||
@staticmethod |
||||
def _get_time_value_pair(pair_string): |
||||
# example pair_string: '[1532544591, 97.3653601828]' |
||||
pair_string = pair_string.replace('[', '') |
||||
pair_string = pair_string.replace(']', '') |
||||
pair = pair_string.split(',') |
||||
first = int(pair[0].strip()) |
||||
second = float(pair[1].strip()) |
||||
return [first, second] |
||||
|
||||
def __init__(self, client, entities, key_prefix=None): |
||||
super().__init__() |
||||
self.client = client |
||||
self.entities = entities |
||||
self.key_prefix = key_prefix |
||||
self.stats_freq_sec = 60 |
||||
self.duration_sec = 60 |
||||
# Fetch last 3 hours data by default |
||||
self.end_time = int(time.time()) |
||||
self.start_time = self.end_time - (3 * 60 * 60) |
||||
|
||||
def execute_script(self, command): |
||||
print('executing...') |
||||
print(command) |
||||
out_file = open(self.OUTPUT_FILE, "w+") |
||||
err_file = open(self.ERROR_FILE, "w+") |
||||
subprocess.call(command, shell=True, stdout=out_file, stderr=err_file) |
||||
out_file.close() |
||||
err_file.close() |
||||
|
||||
def parse_rapido_output(self): |
||||
# Output looks like the following: |
||||
# <entity_name>\t<key_name>\t[[ts, value], [ts, value], ...] |
||||
# ts = timestamp; value = value of key_name in entity_name at time ts |
||||
self.keys_ts = {} |
||||
with open(self.OUTPUT_FILE, 'r') as fp: |
||||
for line in fp: |
||||
token_list = line.strip().split('\t') |
||||
entity = token_list[0] |
||||
key = token_list[1] |
||||
if entity not in self.keys_ts: |
||||
self.keys_ts[entity] = {} |
||||
if key not in self.keys_ts[entity]: |
||||
self.keys_ts[entity][key] = {} |
||||
list_of_lists = [ |
||||
self._get_time_value_pair(pair_string) |
||||
for pair_string in token_list[2].split('],') |
||||
] |
||||
value = {pair[0]: pair[1] for pair in list_of_lists} |
||||
self.keys_ts[entity][key] = value |
||||
|
||||
def parse_ods_output(self): |
||||
# Output looks like the following: |
||||
# <entity_name>\t<key_name>\t<timestamp>\t<value> |
||||
# there is one line per (entity_name, key_name, timestamp) |
||||
self.keys_ts = {} |
||||
with open(self.OUTPUT_FILE, 'r') as fp: |
||||
for line in fp: |
||||
token_list = line.split() |
||||
entity = token_list[0] |
||||
if entity not in self.keys_ts: |
||||
self.keys_ts[entity] = {} |
||||
key = token_list[1] |
||||
if key not in self.keys_ts[entity]: |
||||
self.keys_ts[entity][key] = {} |
||||
self.keys_ts[entity][key][token_list[2]] = token_list[3] |
||||
|
||||
def fetch_timeseries(self, statistics): |
||||
# this method fetches the timeseries of required stats from the ODS |
||||
# service and populates the 'keys_ts' object appropriately |
||||
print('OdsStatsFetcher: fetching ' + str(statistics)) |
||||
if re.search('rapido', self.client, re.IGNORECASE): |
||||
command = self.RAPIDO_COMMAND % ( |
||||
self.client, |
||||
self._get_string_in_quotes(self.entities), |
||||
self._get_string_in_quotes(','.join(statistics)), |
||||
self._get_string_in_quotes(self.start_time), |
||||
self._get_string_in_quotes(self.end_time) |
||||
) |
||||
# Run the tool and fetch the time-series data |
||||
self.execute_script(command) |
||||
# Parse output and populate the 'keys_ts' map |
||||
self.parse_rapido_output() |
||||
elif re.search('ods', self.client, re.IGNORECASE): |
||||
command = self.ODS_COMMAND % ( |
||||
self.client, |
||||
self._get_string_in_quotes(self.entities), |
||||
self._get_string_in_quotes(','.join(statistics)) |
||||
) |
||||
# Run the tool and fetch the time-series data |
||||
self.execute_script(command) |
||||
# Parse output and populate the 'keys_ts' map |
||||
self.parse_ods_output() |
||||
|
||||
def get_keys_from_conditions(self, conditions): |
||||
reqd_stats = [] |
||||
for cond in conditions: |
||||
for key in cond.keys: |
||||
use_prefix = False |
||||
if key.startswith('[]'): |
||||
use_prefix = True |
||||
key = key[2:] |
||||
# TODO(poojam23): this is very hacky and needs to be improved |
||||
if key.startswith("rocksdb"): |
||||
key += ".60" |
||||
if use_prefix: |
||||
if not self.key_prefix: |
||||
print('Warning: OdsStatsFetcher might need key prefix') |
||||
print('for the key: ' + key) |
||||
else: |
||||
key = self.key_prefix + "." + key |
||||
reqd_stats.append(key) |
||||
return reqd_stats |
||||
|
||||
def fetch_rate_url(self, entities, keys, window_len, percent, display): |
||||
# type: (List[str], List[str], str, str, bool) -> str |
||||
transform_desc = ( |
||||
"rate(" + str(window_len) + ",duration=" + str(self.duration_sec) |
||||
) |
||||
if percent: |
||||
transform_desc = transform_desc + ",%)" |
||||
else: |
||||
transform_desc = transform_desc + ")" |
||||
|
||||
command = self.RAPIDO_COMMAND + " --transform=%s --url=%s" |
||||
command = command % ( |
||||
self.client, |
||||
self._get_string_in_quotes(','.join(entities)), |
||||
self._get_string_in_quotes(','.join(keys)), |
||||
self._get_string_in_quotes(self.start_time), |
||||
self._get_string_in_quotes(self.end_time), |
||||
self._get_string_in_quotes(transform_desc), |
||||
self._get_string_in_quotes(display) |
||||
) |
||||
self.execute_script(command) |
||||
url = "" |
||||
with open(self.OUTPUT_FILE, 'r') as fp: |
||||
url = fp.readline() |
||||
return url |
||||
|
||||
|
||||
# TODO(poojam23): remove these blocks once the unittests for LogStatsParser are |
||||
# in place |
||||
def main(): |
||||
# populating the statistics |
||||
log_stats = LogStatsParser('temp/db_stats_fetcher_main_LOG.tmp', 20) |
||||
print(log_stats.type) |
||||
print(log_stats.keys_ts) |
||||
print(log_stats.logs_file_prefix) |
||||
print(log_stats.stats_freq_sec) |
||||
print(log_stats.duration_sec) |
||||
statistics = [ |
||||
'rocksdb.number.rate_limiter.drains.count', |
||||
'rocksdb.number.block.decompressed.count', |
||||
'rocksdb.db.get.micros.p50', |
||||
'rocksdb.manifest.file.sync.micros.p99', |
||||
'rocksdb.db.get.micros.p99' |
||||
] |
||||
log_stats.fetch_timeseries(statistics) |
||||
print() |
||||
print(log_stats.keys_ts) |
||||
# aggregated statistics |
||||
print() |
||||
print(log_stats.fetch_aggregated_values( |
||||
NO_ENTITY, statistics, TimeSeriesData.AggregationOperator.latest |
||||
)) |
||||
print(log_stats.fetch_aggregated_values( |
||||
NO_ENTITY, statistics, TimeSeriesData.AggregationOperator.oldest |
||||
)) |
||||
print(log_stats.fetch_aggregated_values( |
||||
NO_ENTITY, statistics, TimeSeriesData.AggregationOperator.max |
||||
)) |
||||
print(log_stats.fetch_aggregated_values( |
||||
NO_ENTITY, statistics, TimeSeriesData.AggregationOperator.min |
||||
)) |
||||
print(log_stats.fetch_aggregated_values( |
||||
NO_ENTITY, statistics, TimeSeriesData.AggregationOperator.avg |
||||
)) |
||||
# condition 'evaluate_expression' that evaluates to true |
||||
cond1 = Condition('cond-1') |
||||
cond1 = TimeSeriesCondition.create(cond1) |
||||
cond1.set_parameter('keys', statistics) |
||||
cond1.set_parameter('behavior', 'evaluate_expression') |
||||
cond1.set_parameter('evaluate', 'keys[3]-keys[2]>=0') |
||||
cond1.set_parameter('aggregation_op', 'avg') |
||||
# condition 'evaluate_expression' that evaluates to false |
||||
cond2 = Condition('cond-2') |
||||
cond2 = TimeSeriesCondition.create(cond2) |
||||
cond2.set_parameter('keys', statistics) |
||||
cond2.set_parameter('behavior', 'evaluate_expression') |
||||
cond2.set_parameter('evaluate', '((keys[1]-(2*keys[0]))/100)<3000') |
||||
cond2.set_parameter('aggregation_op', 'latest') |
||||
# condition 'evaluate_expression' that evaluates to true; no aggregation_op |
||||
cond3 = Condition('cond-3') |
||||
cond3 = TimeSeriesCondition.create(cond3) |
||||
cond3.set_parameter('keys', [statistics[2], statistics[3]]) |
||||
cond3.set_parameter('behavior', 'evaluate_expression') |
||||
cond3.set_parameter('evaluate', '(keys[1]/keys[0])>23') |
||||
# check remaining methods |
||||
conditions = [cond1, cond2, cond3] |
||||
print() |
||||
print(log_stats.get_keys_from_conditions(conditions)) |
||||
log_stats.check_and_trigger_conditions(conditions) |
||||
print() |
||||
print(cond1.get_trigger()) |
||||
print(cond2.get_trigger()) |
||||
print(cond3.get_trigger()) |
||||
|
||||
|
||||
# TODO(poojam23): shift this code to the unit tests for DatabasePerfContext |
||||
def check_perf_context_code(): |
||||
string = ( |
||||
" user_key_comparison_count = 675903942, " + |
||||
"block_cache_hit_count = 830086, " + |
||||
"get_from_output_files_time = 85088293818, " + |
||||
"seek_on_memtable_time = 0," |
||||
) |
||||
token_list = string.split(',') |
||||
perf_context = { |
||||
token.split('=')[0].strip(): int(token.split('=')[1].strip()) |
||||
for token in token_list |
||||
if token |
||||
} |
||||
timestamp = int(time.time()) |
||||
perf_ts = {} |
||||
for key in perf_context: |
||||
perf_ts[key] = {} |
||||
start_val = perf_context[key] |
||||
for ix in range(5): |
||||
perf_ts[key][timestamp+(ix*10)] = start_val + (2 * ix) |
||||
db_perf_context = DatabasePerfContext(perf_ts, 10, True) |
||||
print(db_perf_context.keys_ts) |
||||
|
||||
|
||||
if __name__ == '__main__': |
||||
main() |
||||
check_perf_context_code() |
@ -0,0 +1,208 @@ |
||||
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
||||
# This source code is licensed under both the GPLv2 (found in the |
||||
# COPYING file in the root directory) and Apache 2.0 License |
||||
# (found in the LICENSE.Apache file in the root directory). |
||||
|
||||
from abc import abstractmethod |
||||
from advisor.db_log_parser import DataSource |
||||
from enum import Enum |
||||
import math |
||||
|
||||
|
||||
NO_ENTITY = 'ENTITY_PLACEHOLDER' |
||||
|
||||
|
||||
class TimeSeriesData(DataSource): |
||||
class Behavior(Enum): |
||||
bursty = 1 |
||||
evaluate_expression = 2 |
||||
|
||||
class AggregationOperator(Enum): |
||||
avg = 1 |
||||
max = 2 |
||||
min = 3 |
||||
latest = 4 |
||||
oldest = 5 |
||||
|
||||
def __init__(self): |
||||
super().__init__(DataSource.Type.TIME_SERIES) |
||||
self.keys_ts = None # Dict[entity, Dict[key, Dict[timestamp, value]]] |
||||
self.stats_freq_sec = None |
||||
|
||||
@abstractmethod |
||||
def get_keys_from_conditions(self, conditions): |
||||
# This method takes in a list of time-series conditions; for each |
||||
# condition it manipulates the 'keys' in the way that is supported by |
||||
# the subclass implementing this method |
||||
pass |
||||
|
||||
@abstractmethod |
||||
def fetch_timeseries(self, required_statistics): |
||||
# this method takes in a list of statistics and fetches the timeseries |
||||
# for each of them and populates the 'keys_ts' dictionary |
||||
pass |
||||
|
||||
def fetch_burst_epochs( |
||||
self, entities, statistic, window_sec, threshold, percent |
||||
): |
||||
# type: (str, int, float, bool) -> Dict[str, Dict[int, float]] |
||||
# this method calculates the (percent) rate change in the 'statistic' |
||||
# for each entity (over 'window_sec' seconds) and returns the epochs |
||||
# where this rate change is greater than or equal to the 'threshold' |
||||
# value |
||||
if self.stats_freq_sec == 0: |
||||
# not time series data, cannot check for bursty behavior |
||||
return |
||||
if window_sec < self.stats_freq_sec: |
||||
window_sec = self.stats_freq_sec |
||||
# 'window_samples' is the number of windows to go back to |
||||
# compare the current window with, while calculating rate change. |
||||
window_samples = math.ceil(window_sec / self.stats_freq_sec) |
||||
burst_epochs = {} |
||||
# if percent = False: |
||||
# curr_val = value at window for which rate change is being calculated |
||||
# prev_val = value at window that is window_samples behind curr_window |
||||
# Then rate_without_percent = |
||||
# ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp) |
||||
# if percent = True: |
||||
# rate_with_percent = (rate_without_percent * 100) / prev_val |
||||
# These calculations are in line with the rate() transform supported |
||||
# by ODS |
||||
for entity in entities: |
||||
if statistic not in self.keys_ts[entity]: |
||||
continue |
||||
timestamps = sorted(list(self.keys_ts[entity][statistic].keys())) |
||||
for ix in range(window_samples, len(timestamps), 1): |
||||
first_ts = timestamps[ix - window_samples] |
||||
last_ts = timestamps[ix] |
||||
first_val = self.keys_ts[entity][statistic][first_ts] |
||||
last_val = self.keys_ts[entity][statistic][last_ts] |
||||
diff = last_val - first_val |
||||
if percent: |
||||
diff = diff * 100 / first_val |
||||
rate = (diff * self.duration_sec) / (last_ts - first_ts) |
||||
# if the rate change is greater than the provided threshold, |
||||
# then the condition is triggered for entity at time 'last_ts' |
||||
if rate >= threshold: |
||||
if entity not in burst_epochs: |
||||
burst_epochs[entity] = {} |
||||
burst_epochs[entity][last_ts] = rate |
||||
return burst_epochs |
||||
|
||||
def fetch_aggregated_values(self, entity, statistics, aggregation_op): |
||||
# type: (str, AggregationOperator) -> Dict[str, float] |
||||
# this method performs the aggregation specified by 'aggregation_op' |
||||
# on the timeseries of 'statistics' for 'entity' and returns: |
||||
# Dict[statistic, aggregated_value] |
||||
result = {} |
||||
for stat in statistics: |
||||
if stat not in self.keys_ts[entity]: |
||||
continue |
||||
agg_val = None |
||||
if aggregation_op is self.AggregationOperator.latest: |
||||
latest_timestamp = max(list(self.keys_ts[entity][stat].keys())) |
||||
agg_val = self.keys_ts[entity][stat][latest_timestamp] |
||||
elif aggregation_op is self.AggregationOperator.oldest: |
||||
oldest_timestamp = min(list(self.keys_ts[entity][stat].keys())) |
||||
agg_val = self.keys_ts[entity][stat][oldest_timestamp] |
||||
elif aggregation_op is self.AggregationOperator.max: |
||||
agg_val = max(list(self.keys_ts[entity][stat].values())) |
||||
elif aggregation_op is self.AggregationOperator.min: |
||||
agg_val = min(list(self.keys_ts[entity][stat].values())) |
||||
elif aggregation_op is self.AggregationOperator.avg: |
||||
values = list(self.keys_ts[entity][stat].values()) |
||||
agg_val = sum(values) / len(values) |
||||
result[stat] = agg_val |
||||
return result |
||||
|
||||
def check_and_trigger_conditions(self, conditions): |
||||
# get the list of statistics that need to be fetched |
||||
reqd_keys = self.get_keys_from_conditions(conditions) |
||||
# fetch the required statistics and populate the map 'keys_ts' |
||||
self.fetch_timeseries(reqd_keys) |
||||
# Trigger the appropriate conditions |
||||
for cond in conditions: |
||||
complete_keys = self.get_keys_from_conditions([cond]) |
||||
# Get the entities that have all statistics required by 'cond': |
||||
# an entity is checked for a given condition only if we possess all |
||||
# of the condition's 'keys' for that entity |
||||
entities_with_stats = [] |
||||
for entity in self.keys_ts: |
||||
stat_missing = False |
||||
for stat in complete_keys: |
||||
if stat not in self.keys_ts[entity]: |
||||
stat_missing = True |
||||
break |
||||
if not stat_missing: |
||||
entities_with_stats.append(entity) |
||||
if not entities_with_stats: |
||||
continue |
||||
if cond.behavior is self.Behavior.bursty: |
||||
# for a condition that checks for bursty behavior, only one key |
||||
# should be present in the condition's 'keys' field |
||||
result = self.fetch_burst_epochs( |
||||
entities_with_stats, |
||||
complete_keys[0], # there should be only one key |
||||
cond.window_sec, |
||||
cond.rate_threshold, |
||||
True |
||||
) |
||||
# Trigger in this case is: |
||||
# Dict[entity_name, Dict[timestamp, rate_change]] |
||||
# where the inner dictionary contains rate_change values when |
||||
# the rate_change >= threshold provided, with the |
||||
# corresponding timestamps |
||||
if result: |
||||
cond.set_trigger(result) |
||||
elif cond.behavior is self.Behavior.evaluate_expression: |
||||
self.handle_evaluate_expression( |
||||
cond, |
||||
complete_keys, |
||||
entities_with_stats |
||||
) |
||||
|
||||
def handle_evaluate_expression(self, condition, statistics, entities): |
||||
trigger = {} |
||||
# check 'condition' for each of these entities |
||||
for entity in entities: |
||||
if hasattr(condition, 'aggregation_op'): |
||||
# in this case, the aggregation operation is performed on each |
||||
# of the condition's 'keys' and then with aggregated values |
||||
# condition's 'expression' is evaluated; if it evaluates to |
||||
# True, then list of the keys values is added to the |
||||
# condition's trigger: Dict[entity_name, List[stats]] |
||||
result = self.fetch_aggregated_values( |
||||
entity, statistics, condition.aggregation_op |
||||
) |
||||
keys = [result[key] for key in statistics] |
||||
try: |
||||
if eval(condition.expression): |
||||
trigger[entity] = keys |
||||
except Exception as e: |
||||
print( |
||||
'WARNING(TimeSeriesData) check_and_trigger: ' + str(e) |
||||
) |
||||
else: |
||||
# assumption: all stats have same series of timestamps |
||||
# this is similar to the above but 'expression' is evaluated at |
||||
# each timestamp, since there is no aggregation, and all the |
||||
# epochs are added to the trigger when the condition's |
||||
# 'expression' evaluated to true; so trigger is: |
||||
# Dict[entity, Dict[timestamp, List[stats]]] |
||||
for epoch in self.keys_ts[entity][statistics[0]].keys(): |
||||
keys = [ |
||||
self.keys_ts[entity][key][epoch] |
||||
for key in statistics |
||||
] |
||||
try: |
||||
if eval(condition.expression): |
||||
if entity not in trigger: |
||||
trigger[entity] = {} |
||||
trigger[entity][epoch] = keys |
||||
except Exception as e: |
||||
print( |
||||
'WARNING(TimeSeriesData) check_and_trigger: ' + |
||||
str(e) |
||||
) |
||||
if trigger: |
||||
condition.set_trigger(trigger) |
@ -0,0 +1,83 @@ |
||||
[Rule "stall-too-many-memtables"] |
||||
suggestions=inc-bg-flush:inc-write-buffer |
||||
conditions=stall-too-many-memtables |
||||
|
||||
[Condition "stall-too-many-memtables"] |
||||
source=LOG |
||||
regex=Stopping writes because we have \d+ immutable memtables \(waiting for flush\), max_write_buffer_number is set to \d+ |
||||
|
||||
[Rule "stall-too-many-L0"] |
||||
suggestions=inc-max-subcompactions:inc-max-bg-compactions:inc-write-buffer-size:dec-max-bytes-for-level-base:inc-l0-slowdown-writes-trigger |
||||
conditions=stall-too-many-L0 |
||||
|
||||
[Condition "stall-too-many-L0"] |
||||
source=LOG |
||||
regex=Stalling writes because we have \d+ level-0 files |
||||
|
||||
[Rule "stop-too-many-L0"] |
||||
suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-l0-stop-writes-trigger |
||||
conditions=stop-too-many-L0 |
||||
|
||||
[Condition "stop-too-many-L0"] |
||||
source=LOG |
||||
regex=Stopping writes because we have \d+ level-0 files |
||||
|
||||
[Rule "stall-too-many-compaction-bytes"] |
||||
suggestions=inc-max-bg-compactions:inc-write-buffer-size:inc-hard-pending-compaction-bytes-limit:inc-soft-pending-compaction-bytes-limit |
||||
conditions=stall-too-many-compaction-bytes |
||||
|
||||
[Condition "stall-too-many-compaction-bytes"] |
||||
source=LOG |
||||
regex=Stalling writes because of estimated pending compaction bytes \d+ |
||||
|
||||
[Suggestion "inc-bg-flush"] |
||||
option=DBOptions.max_background_flushes |
||||
action=increase |
||||
|
||||
[Suggestion "inc-write-buffer"] |
||||
option=CFOptions.max_write_buffer_number |
||||
action=increase |
||||
|
||||
[Suggestion "inc-max-subcompactions"] |
||||
option=DBOptions.max_subcompactions |
||||
action=increase |
||||
|
||||
[Suggestion "inc-max-bg-compactions"] |
||||
option=DBOptions.max_background_compactions |
||||
action=increase |
||||
|
||||
[Suggestion "inc-write-buffer-size"] |
||||
option=CFOptions.write_buffer_size |
||||
action=increase |
||||
|
||||
[Suggestion "dec-max-bytes-for-level-base"] |
||||
option=CFOptions.max_bytes_for_level_base |
||||
action=decrease |
||||
|
||||
[Suggestion "inc-l0-slowdown-writes-trigger"] |
||||
option=CFOptions.level0_slowdown_writes_trigger |
||||
action=increase |
||||
|
||||
[Suggestion "inc-l0-stop-writes-trigger"] |
||||
option=CFOptions.level0_stop_writes_trigger |
||||
action=increase |
||||
|
||||
[Suggestion "inc-hard-pending-compaction-bytes-limit"] |
||||
option=CFOptions.hard_pending_compaction_bytes_limit |
||||
action=increase |
||||
|
||||
[Suggestion "inc-soft-pending-compaction-bytes-limit"] |
||||
option=CFOptions.soft_pending_compaction_bytes_limit |
||||
action=increase |
||||
|
||||
[Rule "level0-level1-ratio"] |
||||
conditions=level0-level1-ratio |
||||
suggestions=l0-l1-ratio-health-check |
||||
|
||||
[Condition "level0-level1-ratio"] |
||||
source=OPTIONS |
||||
options=CFOptions.level0_file_num_compaction_trigger:CFOptions.write_buffer_size:CFOptions.max_bytes_for_level_base |
||||
evaluate=int(options[0])*int(options[1])-int(options[2])>=-268173312 # should evaluate to a boolean, condition triggered if evaluates to true |
||||
|
||||
[Suggestion "l0-l1-ratio-health-check"] |
||||
description='modify options such that (level0_file_num_compaction_trigger * write_buffer_size - max_bytes_for_level_base < -268173312) is satisfied' |
@ -0,0 +1,98 @@ |
||||
from advisor.db_log_parser import DatabaseLogs, Log, NO_COL_FAMILY |
||||
from advisor.rule_parser import Condition, LogCondition |
||||
import os |
||||
import unittest |
||||
|
||||
|
||||
class TestLog(unittest.TestCase): |
||||
def setUp(self): |
||||
self.column_families = ['default', 'col_fam_A'] |
||||
|
||||
def test_get_column_family(self): |
||||
test_log = ( |
||||
"2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] " + |
||||
"[col_fam_A] [JOB 44] Level-0 flush table #84: 1890780 bytes OK" |
||||
) |
||||
db_log = Log(test_log, self.column_families) |
||||
self.assertEqual('col_fam_A', db_log.get_column_family()) |
||||
|
||||
test_log = ( |
||||
"2018/05/25-14:34:21.047233 7f82ba72e700 [db/flush_job.cc:371] " + |
||||
"[JOB 44] Level-0 flush table #84: 1890780 bytes OK" |
||||
) |
||||
db_log = Log(test_log, self.column_families) |
||||
db_log.append_message('[default] some remaining part of log') |
||||
self.assertEqual(NO_COL_FAMILY, db_log.get_column_family()) |
||||
|
||||
def test_get_methods(self): |
||||
hr_time = "2018/05/25-14:30:25.491635" |
||||
context = "7f82ba72e700" |
||||
message = ( |
||||
"[db/flush_job.cc:331] [default] [JOB 10] Level-0 flush table " + |
||||
"#23: started" |
||||
) |
||||
test_log = hr_time + " " + context + " " + message |
||||
db_log = Log(test_log, self.column_families) |
||||
self.assertEqual(db_log.get_message(), message) |
||||
remaining_message = "[col_fam_A] some more logs" |
||||
db_log.append_message(remaining_message) |
||||
self.assertEqual( |
||||
db_log.get_human_readable_time(), "2018/05/25-14:30:25.491635" |
||||
) |
||||
self.assertEqual(db_log.get_context(), "7f82ba72e700") |
||||
self.assertEqual(db_log.get_timestamp(), 1527258625) |
||||
self.assertEqual( |
||||
db_log.get_message(), str(message + '\n' + remaining_message) |
||||
) |
||||
|
||||
def test_is_new_log(self): |
||||
new_log = "2018/05/25-14:34:21.047233 context random new log" |
||||
remaining_log = "2018/05/25 not really a new log" |
||||
self.assertTrue(Log.is_new_log(new_log)) |
||||
self.assertFalse(Log.is_new_log(remaining_log)) |
||||
|
||||
|
||||
class TestDatabaseLogs(unittest.TestCase): |
||||
def test_check_and_trigger_conditions(self): |
||||
this_path = os.path.abspath(os.path.dirname(__file__)) |
||||
logs_path_prefix = os.path.join(this_path, 'input_files/LOG-0') |
||||
column_families = ['default', 'col-fam-A', 'col-fam-B'] |
||||
db_logs = DatabaseLogs(logs_path_prefix, column_families) |
||||
# matches, has 2 col_fams |
||||
condition1 = LogCondition.create(Condition('cond-A')) |
||||
condition1.set_parameter('regex', 'random log message') |
||||
# matches, multiple lines message |
||||
condition2 = LogCondition.create(Condition('cond-B')) |
||||
condition2.set_parameter('regex', 'continuing on next line') |
||||
# does not match |
||||
condition3 = LogCondition.create(Condition('cond-C')) |
||||
condition3.set_parameter('regex', 'this should match no log') |
||||
db_logs.check_and_trigger_conditions( |
||||
[condition1, condition2, condition3] |
||||
) |
||||
cond1_trigger = condition1.get_trigger() |
||||
self.assertEqual(2, len(cond1_trigger.keys())) |
||||
self.assertSetEqual( |
||||
{'col-fam-A', NO_COL_FAMILY}, set(cond1_trigger.keys()) |
||||
) |
||||
self.assertEqual(2, len(cond1_trigger['col-fam-A'])) |
||||
messages = [ |
||||
"[db/db_impl.cc:563] [col-fam-A] random log message for testing", |
||||
"[db/db_impl.cc:653] [col-fam-A] another random log message" |
||||
] |
||||
self.assertIn(cond1_trigger['col-fam-A'][0].get_message(), messages) |
||||
self.assertIn(cond1_trigger['col-fam-A'][1].get_message(), messages) |
||||
self.assertEqual(1, len(cond1_trigger[NO_COL_FAMILY])) |
||||
self.assertEqual( |
||||
cond1_trigger[NO_COL_FAMILY][0].get_message(), |
||||
"[db/db_impl.cc:331] [unknown] random log message no column family" |
||||
) |
||||
cond2_trigger = condition2.get_trigger() |
||||
self.assertEqual(['col-fam-B'], list(cond2_trigger.keys())) |
||||
self.assertEqual(1, len(cond2_trigger['col-fam-B'])) |
||||
self.assertEqual( |
||||
cond2_trigger['col-fam-B'][0].get_message(), |
||||
"[db/db_impl.cc:234] [col-fam-B] log continuing on next line\n" + |
||||
"remaining part of the log" |
||||
) |
||||
self.assertIsNone(condition3.get_trigger()) |
Loading…
Reference in new issue