# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. # This source code is licensed under both the GPLv2 (found in the # COPYING file in the root directory) and Apache 2.0 License # (found in the LICENSE.Apache file in the root directory). import re from abc import ABC, abstractmethod from enum import Enum from advisor.db_log_parser import DataSource, NO_COL_FAMILY from advisor.db_timeseries_parser import TimeSeriesData from advisor.ini_parser import IniParser class Section(ABC): def __init__(self, name): self.name = name @abstractmethod def set_parameter(self, key, value): pass @abstractmethod def perform_checks(self): pass class Rule(Section): def __init__(self, name): super().__init__(name) self.conditions = None self.suggestions = None self.overlap_time_seconds = None self.trigger_entities = None self.trigger_column_families = None def set_parameter(self, key, value): # If the Rule is associated with a single suggestion/condition, then # value will be a string and not a list. Hence, convert it to a single # element list before storing it in self.suggestions or # self.conditions. if key == "conditions": if isinstance(value, str): self.conditions = [value] else: self.conditions = value elif key == "suggestions": if isinstance(value, str): self.suggestions = [value] else: self.suggestions = value elif key == "overlap_time_period": self.overlap_time_seconds = value def get_suggestions(self): return self.suggestions def perform_checks(self): if not self.conditions or len(self.conditions) < 1: raise ValueError(self.name + ": rule must have at least one condition") if not self.suggestions or len(self.suggestions) < 1: raise ValueError(self.name + ": rule must have at least one suggestion") if self.overlap_time_seconds: if len(self.conditions) != 2: raise ValueError( self.name + ": rule must be associated with 2 conditions\ in order to check for a time dependency between them" ) time_format = "^\d+[s|m|h|d]$" # noqa if not re.match(time_format, self.overlap_time_seconds, re.IGNORECASE): raise ValueError( self.name + ": overlap_time_seconds format: \d+[s|m|h|d]" ) else: # convert to seconds in_seconds = int(self.overlap_time_seconds[:-1]) if self.overlap_time_seconds[-1] == "m": in_seconds *= 60 elif self.overlap_time_seconds[-1] == "h": in_seconds *= 60 * 60 elif self.overlap_time_seconds[-1] == "d": in_seconds *= 24 * 60 * 60 self.overlap_time_seconds = in_seconds def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs): # this method takes in 2 timeseries i.e. timestamps at which the # rule's 2 TIME_SERIES conditions were triggered and it finds # (if present) the first pair of timestamps at which the 2 conditions # were triggered within 'overlap_time_seconds' of each other key1_lower_bounds = [ epoch - self.overlap_time_seconds for epoch in key1_trigger_epochs ] key1_lower_bounds.sort() key2_trigger_epochs.sort() trigger_ix = 0 overlap_pair = None for key1_lb in key1_lower_bounds: while key2_trigger_epochs[trigger_ix] < key1_lb and trigger_ix < len( key2_trigger_epochs ): trigger_ix += 1 if trigger_ix >= len(key2_trigger_epochs): break if key2_trigger_epochs[trigger_ix] <= key1_lb + ( 2 * self.overlap_time_seconds ): overlap_pair = ( key2_trigger_epochs[trigger_ix], key1_lb + self.overlap_time_seconds, ) break return overlap_pair def get_trigger_entities(self): return self.trigger_entities def get_trigger_column_families(self): return self.trigger_column_families def is_triggered(self, conditions_dict, column_families): if self.overlap_time_seconds: condition1 = conditions_dict[self.conditions[0]] condition2 = conditions_dict[self.conditions[1]] if not ( condition1.get_data_source() is DataSource.Type.TIME_SERIES and condition2.get_data_source() is DataSource.Type.TIME_SERIES ): raise ValueError(self.name + ": need 2 timeseries conditions") map1 = condition1.get_trigger() map2 = condition2.get_trigger() if not (map1 and map2): return False self.trigger_entities = {} is_triggered = False entity_intersection = set(map1.keys()).intersection(set(map2.keys())) for entity in entity_intersection: overlap_timestamps_pair = self.get_overlap_timestamps( list(map1[entity].keys()), list(map2[entity].keys()) ) if overlap_timestamps_pair: self.trigger_entities[entity] = overlap_timestamps_pair is_triggered = True if is_triggered: self.trigger_column_families = set(column_families) return is_triggered else: all_conditions_triggered = True self.trigger_column_families = set(column_families) for cond_name in self.conditions: cond = conditions_dict[cond_name] if not cond.get_trigger(): all_conditions_triggered = False break if ( cond.get_data_source() is DataSource.Type.LOG or cond.get_data_source() is DataSource.Type.DB_OPTIONS ): cond_col_fam = set(cond.get_trigger().keys()) if NO_COL_FAMILY in cond_col_fam: cond_col_fam = set(column_families) self.trigger_column_families = ( self.trigger_column_families.intersection(cond_col_fam) ) elif cond.get_data_source() is DataSource.Type.TIME_SERIES: cond_entities = set(cond.get_trigger().keys()) if self.trigger_entities is None: self.trigger_entities = cond_entities else: self.trigger_entities = self.trigger_entities.intersection( cond_entities ) if not (self.trigger_entities or self.trigger_column_families): all_conditions_triggered = False break if not all_conditions_triggered: # clean up if rule not triggered self.trigger_column_families = None self.trigger_entities = None return all_conditions_triggered def __repr__(self): # Append conditions rule_string = "Rule: " + self.name + " has conditions:: " is_first = True for cond in self.conditions: if is_first: rule_string += cond is_first = False else: rule_string += " AND " + cond # Append suggestions rule_string += "\nsuggestions:: " is_first = True for sugg in self.suggestions: if is_first: rule_string += sugg is_first = False else: rule_string += ", " + sugg if self.trigger_entities: rule_string += ", entities:: " + str(self.trigger_entities) if self.trigger_column_families: rule_string += ", col_fam:: " + str(self.trigger_column_families) # Return constructed string return rule_string class Suggestion(Section): class Action(Enum): set = 1 increase = 2 decrease = 3 def __init__(self, name): super().__init__(name) self.option = None self.action = None self.suggested_values = None self.description = None def set_parameter(self, key, value): if key == "option": # Note: # case 1: 'option' is supported by Rocksdb OPTIONS file; in this # case the option belongs to one of the sections in the config # file and it's name is prefixed by "." # case 2: 'option' is not supported by Rocksdb OPTIONS file; the # option is not expected to have the character '.' in its name self.option = value elif key == "action": if self.option and not value: raise ValueError(self.name + ": provide action for option") self.action = self.Action[value] elif key == "suggested_values": if isinstance(value, str): self.suggested_values = [value] else: self.suggested_values = value elif key == "description": self.description = value def perform_checks(self): if not self.description: if not self.option: raise ValueError(self.name + ": provide option or description") if not self.action: raise ValueError(self.name + ": provide action for option") if self.action is self.Action.set and not self.suggested_values: raise ValueError(self.name + ": provide suggested value for option") def __repr__(self): sugg_string = "Suggestion: " + self.name if self.description: sugg_string += " description : " + self.description else: sugg_string += " option : " + self.option + " action : " + self.action.name if self.suggested_values: sugg_string += " suggested_values : " + str(self.suggested_values) return sugg_string class Condition(Section): def __init__(self, name): super().__init__(name) self.data_source = None self.trigger = None def perform_checks(self): if not self.data_source: raise ValueError(self.name + ": condition not tied to data source") def set_data_source(self, data_source): self.data_source = data_source def get_data_source(self): return self.data_source def reset_trigger(self): self.trigger = None def set_trigger(self, condition_trigger): self.trigger = condition_trigger def get_trigger(self): return self.trigger def is_triggered(self): if self.trigger: return True return False def set_parameter(self, key, value): # must be defined by the subclass raise NotImplementedError(self.name + ": provide source for condition") class LogCondition(Condition): @classmethod def create(cls, base_condition): base_condition.set_data_source(DataSource.Type["LOG"]) base_condition.__class__ = cls return base_condition def set_parameter(self, key, value): if key == "regex": self.regex = value def perform_checks(self): super().perform_checks() if not self.regex: raise ValueError(self.name + ": provide regex for log condition") def __repr__(self): log_cond_str = "LogCondition: " + self.name log_cond_str += " regex: " + self.regex # if self.trigger: # log_cond_str += (" trigger: " + str(self.trigger)) return log_cond_str class OptionCondition(Condition): @classmethod def create(cls, base_condition): base_condition.set_data_source(DataSource.Type["DB_OPTIONS"]) base_condition.__class__ = cls return base_condition def set_parameter(self, key, value): if key == "options": if isinstance(value, str): self.options = [value] else: self.options = value elif key == "evaluate": self.eval_expr = value def perform_checks(self): super().perform_checks() if not self.options: raise ValueError(self.name + ": options missing in condition") if not self.eval_expr: raise ValueError(self.name + ": expression missing in condition") def __repr__(self): opt_cond_str = "OptionCondition: " + self.name opt_cond_str += " options: " + str(self.options) opt_cond_str += " expression: " + self.eval_expr if self.trigger: opt_cond_str += " trigger: " + str(self.trigger) return opt_cond_str class TimeSeriesCondition(Condition): @classmethod def create(cls, base_condition): base_condition.set_data_source(DataSource.Type["TIME_SERIES"]) base_condition.__class__ = cls return base_condition def set_parameter(self, key, value): if key == "keys": if isinstance(value, str): self.keys = [value] else: self.keys = value elif key == "behavior": self.behavior = TimeSeriesData.Behavior[value] elif key == "rate_threshold": self.rate_threshold = float(value) elif key == "window_sec": self.window_sec = int(value) elif key == "evaluate": self.expression = value elif key == "aggregation_op": self.aggregation_op = TimeSeriesData.AggregationOperator[value] def perform_checks(self): if not self.keys: raise ValueError(self.name + ": specify timeseries key") if not self.behavior: raise ValueError(self.name + ": specify triggering behavior") if self.behavior is TimeSeriesData.Behavior.bursty: if not self.rate_threshold: raise ValueError(self.name + ": specify rate burst threshold") if not self.window_sec: self.window_sec = 300 # default window length is 5 minutes if len(self.keys) > 1: raise ValueError(self.name + ": specify only one key") elif self.behavior is TimeSeriesData.Behavior.evaluate_expression: if not (self.expression): raise ValueError(self.name + ": specify evaluation expression") else: raise ValueError(self.name + ": trigger behavior not supported") def __repr__(self): ts_cond_str = "TimeSeriesCondition: " + self.name ts_cond_str += " statistics: " + str(self.keys) ts_cond_str += " behavior: " + self.behavior.name if self.behavior is TimeSeriesData.Behavior.bursty: ts_cond_str += " rate_threshold: " + str(self.rate_threshold) ts_cond_str += " window_sec: " + str(self.window_sec) if self.behavior is TimeSeriesData.Behavior.evaluate_expression: ts_cond_str += " expression: " + self.expression if hasattr(self, "aggregation_op"): ts_cond_str += " aggregation_op: " + self.aggregation_op.name if self.trigger: ts_cond_str += " trigger: " + str(self.trigger) return ts_cond_str class RulesSpec: def __init__(self, rules_path): self.file_path = rules_path def initialise_fields(self): self.rules_dict = {} self.conditions_dict = {} self.suggestions_dict = {} def perform_section_checks(self): for rule in self.rules_dict.values(): rule.perform_checks() for cond in self.conditions_dict.values(): cond.perform_checks() for sugg in self.suggestions_dict.values(): sugg.perform_checks() def load_rules_from_spec(self): self.initialise_fields() with open(self.file_path, "r") as db_rules: curr_section = None for line in db_rules: line = IniParser.remove_trailing_comment(line) if not line: continue element = IniParser.get_element(line) if element is IniParser.Element.comment: continue elif element is not IniParser.Element.key_val: curr_section = element # it's a new IniParser header section_name = IniParser.get_section_name(line) if element is IniParser.Element.rule: new_rule = Rule(section_name) self.rules_dict[section_name] = new_rule elif element is IniParser.Element.cond: new_cond = Condition(section_name) self.conditions_dict[section_name] = new_cond elif element is IniParser.Element.sugg: new_suggestion = Suggestion(section_name) self.suggestions_dict[section_name] = new_suggestion elif element is IniParser.Element.key_val: key, value = IniParser.get_key_value_pair(line) if curr_section is IniParser.Element.rule: new_rule.set_parameter(key, value) elif curr_section is IniParser.Element.cond: if key == "source": if value == "LOG": new_cond = LogCondition.create(new_cond) elif value == "OPTIONS": new_cond = OptionCondition.create(new_cond) elif value == "TIME_SERIES": new_cond = TimeSeriesCondition.create(new_cond) else: new_cond.set_parameter(key, value) elif curr_section is IniParser.Element.sugg: new_suggestion.set_parameter(key, value) def get_rules_dict(self): return self.rules_dict def get_conditions_dict(self): return self.conditions_dict def get_suggestions_dict(self): return self.suggestions_dict def get_triggered_rules(self, data_sources, column_families): self.trigger_conditions(data_sources) triggered_rules = [] for rule in self.rules_dict.values(): if rule.is_triggered(self.conditions_dict, column_families): triggered_rules.append(rule) return triggered_rules def trigger_conditions(self, data_sources): for source_type in data_sources: cond_subset = [ cond for cond in self.conditions_dict.values() if cond.get_data_source() is source_type ] if not cond_subset: continue for source in data_sources[source_type]: source.check_and_trigger_conditions(cond_subset) def print_rules(self, rules): for rule in rules: print("\nRule: " + rule.name) for cond_name in rule.conditions: print(repr(self.conditions_dict[cond_name])) for sugg_name in rule.suggestions: print(repr(self.suggestions_dict[sugg_name])) if rule.trigger_entities: print("scope: entities:") print(rule.trigger_entities) if rule.trigger_column_families: print("scope: col_fam:") print(rule.trigger_column_families)