You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/tools/advisor/advisor/db_timeseries_parser.py

203 lines
9.2 KiB

# Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
# This source code is licensed under both the GPLv2 (found in the
# COPYING file in the root directory) and Apache 2.0 License
# (found in the LICENSE.Apache file in the root directory).
import math
from abc import abstractmethod
from enum import Enum
from typing import Dict
from advisor.db_log_parser import DataSource
NO_ENTITY = "ENTITY_PLACEHOLDER"
class TimeSeriesData(DataSource):
class Behavior(Enum):
bursty = 1
evaluate_expression = 2
class AggregationOperator(Enum):
avg = 1
max = 2
min = 3
latest = 4
oldest = 5
def __init__(self):
super().__init__(DataSource.Type.TIME_SERIES)
self.keys_ts = None # Dict[entity, Dict[key, Dict[timestamp, value]]]
self.stats_freq_sec = None
@abstractmethod
def get_keys_from_conditions(self, conditions):
# This method takes in a list of time-series conditions; for each
# condition it manipulates the 'keys' in the way that is supported by
# the subclass implementing this method
pass
@abstractmethod
def fetch_timeseries(self, required_statistics):
# this method takes in a list of statistics and fetches the timeseries
# for each of them and populates the 'keys_ts' dictionary
pass
def fetch_burst_epochs(
self,
entities: str,
statistic: int,
window_sec: float,
threshold: bool,
percent: bool,
) -> Dict[str, Dict[int, float]]:
# this method calculates the (percent) rate change in the 'statistic'
# for each entity (over 'window_sec' seconds) and returns the epochs
# where this rate change is greater than or equal to the 'threshold'
# value
if self.stats_freq_sec == 0:
# not time series data, cannot check for bursty behavior
return
if window_sec < self.stats_freq_sec:
window_sec = self.stats_freq_sec
# 'window_samples' is the number of windows to go back to
# compare the current window with, while calculating rate change.
window_samples = math.ceil(window_sec / self.stats_freq_sec)
burst_epochs = {}
# if percent = False:
# curr_val = value at window for which rate change is being calculated
# prev_val = value at window that is window_samples behind curr_window
# Then rate_without_percent =
# ((curr_val-prev_val)*duration_sec)/(curr_timestamp-prev_timestamp)
# if percent = True:
# rate_with_percent = (rate_without_percent * 100) / prev_val
# These calculations are in line with the rate() transform supported
# by ODS
for entity in entities:
if statistic not in self.keys_ts[entity]:
continue
timestamps = sorted(list(self.keys_ts[entity][statistic].keys()))
for ix in range(window_samples, len(timestamps), 1):
first_ts = timestamps[ix - window_samples]
last_ts = timestamps[ix]
first_val = self.keys_ts[entity][statistic][first_ts]
last_val = self.keys_ts[entity][statistic][last_ts]
diff = last_val - first_val
if percent:
diff = diff * 100 / first_val
rate = (diff * self.duration_sec) / (last_ts - first_ts)
# if the rate change is greater than the provided threshold,
# then the condition is triggered for entity at time 'last_ts'
if rate >= threshold:
if entity not in burst_epochs:
burst_epochs[entity] = {}
burst_epochs[entity][last_ts] = rate
return burst_epochs
def fetch_aggregated_values(self, entity, statistics, aggregation_op):
# this method performs the aggregation specified by 'aggregation_op'
# on the timeseries of 'statistics' for 'entity' and returns:
# Dict[statistic, aggregated_value]
result = {}
for stat in statistics:
if stat not in self.keys_ts[entity]:
continue
agg_val = None
if aggregation_op is self.AggregationOperator.latest:
latest_timestamp = max(list(self.keys_ts[entity][stat].keys()))
agg_val = self.keys_ts[entity][stat][latest_timestamp]
elif aggregation_op is self.AggregationOperator.oldest:
oldest_timestamp = min(list(self.keys_ts[entity][stat].keys()))
agg_val = self.keys_ts[entity][stat][oldest_timestamp]
elif aggregation_op is self.AggregationOperator.max:
agg_val = max(list(self.keys_ts[entity][stat].values()))
elif aggregation_op is self.AggregationOperator.min:
agg_val = min(list(self.keys_ts[entity][stat].values()))
elif aggregation_op is self.AggregationOperator.avg:
values = list(self.keys_ts[entity][stat].values())
agg_val = sum(values) / len(values)
result[stat] = agg_val
return result
def check_and_trigger_conditions(self, conditions):
# get the list of statistics that need to be fetched
reqd_keys = self.get_keys_from_conditions(conditions)
# fetch the required statistics and populate the map 'keys_ts'
self.fetch_timeseries(reqd_keys)
# Trigger the appropriate conditions
for cond in conditions:
complete_keys = self.get_keys_from_conditions([cond])
# Get the entities that have all statistics required by 'cond':
# an entity is checked for a given condition only if we possess all
# of the condition's 'keys' for that entity
entities_with_stats = []
for entity in self.keys_ts:
stat_missing = False
for stat in complete_keys:
if stat not in self.keys_ts[entity]:
stat_missing = True
break
if not stat_missing:
entities_with_stats.append(entity)
if not entities_with_stats:
continue
if cond.behavior is self.Behavior.bursty:
# for a condition that checks for bursty behavior, only one key
# should be present in the condition's 'keys' field
result = self.fetch_burst_epochs(
entities_with_stats,
complete_keys[0], # there should be only one key
cond.window_sec,
cond.rate_threshold,
True,
)
# Trigger in this case is:
# Dict[entity_name, Dict[timestamp, rate_change]]
# where the inner dictionary contains rate_change values when
# the rate_change >= threshold provided, with the
# corresponding timestamps
if result:
cond.set_trigger(result)
elif cond.behavior is self.Behavior.evaluate_expression:
self.handle_evaluate_expression(
cond, complete_keys, entities_with_stats
)
def handle_evaluate_expression(self, condition, statistics, entities):
trigger = {}
# check 'condition' for each of these entities
for entity in entities:
if hasattr(condition, "aggregation_op"):
# in this case, the aggregation operation is performed on each
# of the condition's 'keys' and then with aggregated values
# condition's 'expression' is evaluated; if it evaluates to
# True, then list of the keys values is added to the
# condition's trigger: Dict[entity_name, List[stats]]
result = self.fetch_aggregated_values(
entity, statistics, condition.aggregation_op
)
keys = [result[key] for key in statistics]
try:
if eval(condition.expression):
trigger[entity] = keys
except Exception as e:
print("WARNING(TimeSeriesData) check_and_trigger: " + str(e))
else:
# assumption: all stats have same series of timestamps
# this is similar to the above but 'expression' is evaluated at
# each timestamp, since there is no aggregation, and all the
# epochs are added to the trigger when the condition's
# 'expression' evaluated to true; so trigger is:
# Dict[entity, Dict[timestamp, List[stats]]]
for epoch in self.keys_ts[entity][statistics[0]].keys():
keys = [self.keys_ts[entity][key][epoch] for key in statistics]
try:
if eval(condition.expression):
if entity not in trigger:
trigger[entity] = {}
trigger[entity][epoch] = keys
except Exception as e:
print("WARNING(TimeSeriesData) check_and_trigger: " + str(e))
if trigger:
condition.set_trigger(trigger)