# Copyright (c) 2011-present, Facebook, Inc. All rights reserved. # This source code is licensed under both the GPLv2 (found in the # COPYING file in the root directory) and Apache 2.0 License # (found in the LICENSE.Apache file in the root directory). from abc import ABC, abstractmethod from advisor.db_log_parser import DataSource, NO_COL_FAMILY from advisor.db_timeseries_parser import TimeSeriesData from enum import Enum from advisor.ini_parser import IniParser import re class Section(ABC): def __init__(self, name): self.name = name @abstractmethod def set_parameter(self, key, value): pass @abstractmethod def perform_checks(self): pass class Rule(Section): def __init__(self, name): super().__init__(name) self.conditions = None self.suggestions = None self.overlap_time_seconds = None self.trigger_entities = None self.trigger_column_families = None def set_parameter(self, key, value): # If the Rule is associated with a single suggestion/condition, then # value will be a string and not a list. Hence, convert it to a single # element list before storing it in self.suggestions or # self.conditions. if key == 'conditions': if isinstance(value, str): self.conditions = [value] else: self.conditions = value elif key == 'suggestions': if isinstance(value, str): self.suggestions = [value] else: self.suggestions = value elif key == 'overlap_time_period': self.overlap_time_seconds = value def get_suggestions(self): return self.suggestions def perform_checks(self): if not self.conditions or len(self.conditions) < 1: raise ValueError( self.name + ': rule must have at least one condition' ) if not self.suggestions or len(self.suggestions) < 1: raise ValueError( self.name + ': rule must have at least one suggestion' ) if self.overlap_time_seconds: if len(self.conditions) != 2: raise ValueError( self.name + ": rule must be associated with 2 conditions\ in order to check for a time dependency between them" ) time_format = '^\d+[s|m|h|d]$' if ( not re.match(time_format, self.overlap_time_seconds, re.IGNORECASE) ): raise ValueError( self.name + ": overlap_time_seconds format: \d+[s|m|h|d]" ) else: # convert to seconds in_seconds = int(self.overlap_time_seconds[:-1]) if self.overlap_time_seconds[-1] == 'm': in_seconds *= 60 elif self.overlap_time_seconds[-1] == 'h': in_seconds *= (60 * 60) elif self.overlap_time_seconds[-1] == 'd': in_seconds *= (24 * 60 * 60) self.overlap_time_seconds = in_seconds def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs): # this method takes in 2 timeseries i.e. timestamps at which the # rule's 2 TIME_SERIES conditions were triggered and it finds # (if present) the first pair of timestamps at which the 2 conditions # were triggered within 'overlap_time_seconds' of each other key1_lower_bounds = [ epoch - self.overlap_time_seconds for epoch in key1_trigger_epochs ] key1_lower_bounds.sort() key2_trigger_epochs.sort() trigger_ix = 0 overlap_pair = None for key1_lb in key1_lower_bounds: while ( key2_trigger_epochs[trigger_ix] < key1_lb and trigger_ix < len(key2_trigger_epochs) ): trigger_ix += 1 if trigger_ix >= len(key2_trigger_epochs): break if ( key2_trigger_epochs[trigger_ix] <= key1_lb + (2 * self.overlap_time_seconds) ): overlap_pair = ( key2_trigger_epochs[trigger_ix], key1_lb + self.overlap_time_seconds ) break return overlap_pair def get_trigger_entities(self): return self.trigger_entities def get_trigger_column_families(self): return self.trigger_column_families def is_triggered(self, conditions_dict, column_families): if self.overlap_time_seconds: condition1 = conditions_dict[self.conditions[0]] condition2 = conditions_dict[self.conditions[1]] if not ( condition1.get_data_source() is DataSource.Type.TIME_SERIES and condition2.get_data_source() is DataSource.Type.TIME_SERIES ): raise ValueError(self.name + ': need 2 timeseries conditions') map1 = condition1.get_trigger() map2 = condition2.get_trigger() if not (map1 and map2): return False self.trigger_entities = {} is_triggered = False entity_intersection = ( set(map1.keys()).intersection(set(map2.keys())) ) for entity in entity_intersection: overlap_timestamps_pair = ( self.get_overlap_timestamps( list(map1[entity].keys()), list(map2[entity].keys()) ) ) if overlap_timestamps_pair: self.trigger_entities[entity] = overlap_timestamps_pair is_triggered = True if is_triggered: self.trigger_column_families = set(column_families) return is_triggered else: all_conditions_triggered = True self.trigger_column_families = set(column_families) for cond_name in self.conditions: cond = conditions_dict[cond_name] if not cond.get_trigger(): all_conditions_triggered = False break if ( cond.get_data_source() is DataSource.Type.LOG or cond.get_data_source() is DataSource.Type.DB_OPTIONS ): cond_col_fam = set(cond.get_trigger().keys()) if NO_COL_FAMILY in cond_col_fam: cond_col_fam = set(column_families) self.trigger_column_families = ( self.trigger_column_families.intersection(cond_col_fam) ) elif cond.get_data_source() is DataSource.Type.TIME_SERIES: cond_entities = set(cond.get_trigger().keys()) if self.trigger_entities is None: self.trigger_entities = cond_entities else: self.trigger_entities = ( self.trigger_entities.intersection(cond_entities) ) if not (self.trigger_entities or self.trigger_column_families): all_conditions_triggered = False break if not all_conditions_triggered: # clean up if rule not triggered self.trigger_column_families = None self.trigger_entities = None return all_conditions_triggered def __repr__(self): # Append conditions rule_string = "Rule: " + self.name + " has conditions:: " is_first = True for cond in self.conditions: if is_first: rule_string += cond is_first = False else: rule_string += (" AND " + cond) # Append suggestions rule_string += "\nsuggestions:: " is_first = True for sugg in self.suggestions: if is_first: rule_string += sugg is_first = False else: rule_string += (", " + sugg) if self.trigger_entities: rule_string += (', entities:: ' + str(self.trigger_entities)) if self.trigger_column_families: rule_string += (', col_fam:: ' + str(self.trigger_column_families)) # Return constructed string return rule_string class Suggestion(Section): class Action(Enum): set = 1 increase = 2 decrease = 3 def __init__(self, name): super().__init__(name) self.option = None self.action = None self.suggested_values = None self.description = None def set_parameter(self, key, value): if key == 'option': # Note: # case 1: 'option' is supported by Rocksdb OPTIONS file; in this # case the option belongs to one of the sections in the config # file and it's name is prefixed by "." # case 2: 'option' is not supported by Rocksdb OPTIONS file; the # option is not expected to have the character '.' in its name self.option = value elif key == 'action': if self.option and not value: raise ValueError(self.name + ': provide action for option') self.action = self.Action[value] elif key == 'suggested_values': if isinstance(value, str): self.suggested_values = [value] else: self.suggested_values = value elif key == 'description': self.description = value def perform_checks(self): if not self.description: if not self.option: raise ValueError(self.name + ': provide option or description') if not self.action: raise ValueError(self.name + ': provide action for option') if self.action is self.Action.set and not self.suggested_values: raise ValueError( self.name + ': provide suggested value for option' ) def __repr__(self): sugg_string = "Suggestion: " + self.name if self.description: sugg_string += (' description : ' + self.description) else: sugg_string += ( ' option : ' + self.option + ' action : ' + self.action.name ) if self.suggested_values: sugg_string += ( ' suggested_values : ' + str(self.suggested_values) ) return sugg_string class Condition(Section): def __init__(self, name): super().__init__(name) self.data_source = None self.trigger = None def perform_checks(self): if not self.data_source: raise ValueError(self.name + ': condition not tied to data source') def set_data_source(self, data_source): self.data_source = data_source def get_data_source(self): return self.data_source def reset_trigger(self): self.trigger = None def set_trigger(self, condition_trigger): self.trigger = condition_trigger def get_trigger(self): return self.trigger def is_triggered(self): if self.trigger: return True return False def set_parameter(self, key, value): # must be defined by the subclass raise NotImplementedError(self.name + ': provide source for condition') class LogCondition(Condition): @classmethod def create(cls, base_condition): base_condition.set_data_source(DataSource.Type['LOG']) base_condition.__class__ = cls return base_condition def set_parameter(self, key, value): if key == 'regex': self.regex = value def perform_checks(self): super().perform_checks() if not self.regex: raise ValueError(self.name + ': provide regex for log condition') def __repr__(self): log_cond_str = "LogCondition: " + self.name log_cond_str += (" regex: " + self.regex) # if self.trigger: # log_cond_str += (" trigger: " + str(self.trigger)) return log_cond_str class OptionCondition(Condition): @classmethod def create(cls, base_condition): base_condition.set_data_source(DataSource.Type['DB_OPTIONS']) base_condition.__class__ = cls return base_condition def set_parameter(self, key, value): if key == 'options': if isinstance(value, str): self.options = [value] else: self.options = value elif key == 'evaluate': self.eval_expr = value def perform_checks(self): super().perform_checks() if not self.options: raise ValueError(self.name + ': options missing in condition') if not self.eval_expr: raise ValueError(self.name + ': expression missing in condition') def __repr__(self): opt_cond_str = "OptionCondition: " + self.name opt_cond_str += (" options: " + str(self.options)) opt_cond_str += (" expression: " + self.eval_expr) if self.trigger: opt_cond_str += (" trigger: " + str(self.trigger)) return opt_cond_str class TimeSeriesCondition(Condition): @classmethod def create(cls, base_condition): base_condition.set_data_source(DataSource.Type['TIME_SERIES']) base_condition.__class__ = cls return base_condition def set_parameter(self, key, value): if key == 'keys': if isinstance(value, str): self.keys = [value] else: self.keys = value elif key == 'behavior': self.behavior = TimeSeriesData.Behavior[value] elif key == 'rate_threshold': self.rate_threshold = float(value) elif key == 'window_sec': self.window_sec = int(value) elif key == 'evaluate': self.expression = value elif key == 'aggregation_op': self.aggregation_op = TimeSeriesData.AggregationOperator[value] def perform_checks(self): if not self.keys: raise ValueError(self.name + ': specify timeseries key') if not self.behavior: raise ValueError(self.name + ': specify triggering behavior') if self.behavior is TimeSeriesData.Behavior.bursty: if not self.rate_threshold: raise ValueError(self.name + ': specify rate burst threshold') if not self.window_sec: self.window_sec = 300 # default window length is 5 minutes if len(self.keys) > 1: raise ValueError(self.name + ': specify only one key') elif self.behavior is TimeSeriesData.Behavior.evaluate_expression: if not (self.expression): raise ValueError(self.name + ': specify evaluation expression') else: raise ValueError(self.name + ': trigger behavior not supported') def __repr__(self): ts_cond_str = "TimeSeriesCondition: " + self.name ts_cond_str += (" statistics: " + str(self.keys)) ts_cond_str += (" behavior: " + self.behavior.name) if self.behavior is TimeSeriesData.Behavior.bursty: ts_cond_str += (" rate_threshold: " + str(self.rate_threshold)) ts_cond_str += (" window_sec: " + str(self.window_sec)) if self.behavior is TimeSeriesData.Behavior.evaluate_expression: ts_cond_str += (" expression: " + self.expression) if hasattr(self, 'aggregation_op'): ts_cond_str += (" aggregation_op: " + self.aggregation_op.name) if self.trigger: ts_cond_str += (" trigger: " + str(self.trigger)) return ts_cond_str class RulesSpec: def __init__(self, rules_path): self.file_path = rules_path def initialise_fields(self): self.rules_dict = {} self.conditions_dict = {} self.suggestions_dict = {} def perform_section_checks(self): for rule in self.rules_dict.values(): rule.perform_checks() for cond in self.conditions_dict.values(): cond.perform_checks() for sugg in self.suggestions_dict.values(): sugg.perform_checks() def load_rules_from_spec(self): self.initialise_fields() with open(self.file_path, 'r') as db_rules: curr_section = None for line in db_rules: line = IniParser.remove_trailing_comment(line) if not line: continue element = IniParser.get_element(line) if element is IniParser.Element.comment: continue elif element is not IniParser.Element.key_val: curr_section = element # it's a new IniParser header section_name = IniParser.get_section_name(line) if element is IniParser.Element.rule: new_rule = Rule(section_name) self.rules_dict[section_name] = new_rule elif element is IniParser.Element.cond: new_cond = Condition(section_name) self.conditions_dict[section_name] = new_cond elif element is IniParser.Element.sugg: new_suggestion = Suggestion(section_name) self.suggestions_dict[section_name] = new_suggestion elif element is IniParser.Element.key_val: key, value = IniParser.get_key_value_pair(line) if curr_section is IniParser.Element.rule: new_rule.set_parameter(key, value) elif curr_section is IniParser.Element.cond: if key == 'source': if value == 'LOG': new_cond = LogCondition.create(new_cond) elif value == 'OPTIONS': new_cond = OptionCondition.create(new_cond) elif value == 'TIME_SERIES': new_cond = TimeSeriesCondition.create(new_cond) else: new_cond.set_parameter(key, value) elif curr_section is IniParser.Element.sugg: new_suggestion.set_parameter(key, value) def get_rules_dict(self): return self.rules_dict def get_conditions_dict(self): return self.conditions_dict def get_suggestions_dict(self): return self.suggestions_dict def get_triggered_rules(self, data_sources, column_families): self.trigger_conditions(data_sources) triggered_rules = [] for rule in self.rules_dict.values(): if rule.is_triggered(self.conditions_dict, column_families): triggered_rules.append(rule) return triggered_rules def trigger_conditions(self, data_sources): for source_type in data_sources: cond_subset = [ cond for cond in self.conditions_dict.values() if cond.get_data_source() is source_type ] if not cond_subset: continue for source in data_sources[source_type]: source.check_and_trigger_conditions(cond_subset) def print_rules(self, rules): for rule in rules: print('\nRule: ' + rule.name) for cond_name in rule.conditions: print(repr(self.conditions_dict[cond_name])) for sugg_name in rule.suggestions: print(repr(self.suggestions_dict[sugg_name])) if rule.trigger_entities: print('scope: entities:') print(rule.trigger_entities) if rule.trigger_column_families: print('scope: col_fam:') print(rule.trigger_column_families)