fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
510 lines
20 KiB
510 lines
20 KiB
# Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
# This source code is licensed under both the GPLv2 (found in the
|
|
# COPYING file in the root directory) and Apache 2.0 License
|
|
# (found in the LICENSE.Apache file in the root directory).
|
|
|
|
import re
|
|
from abc import ABC, abstractmethod
|
|
from enum import Enum
|
|
|
|
from advisor.db_log_parser import DataSource, NO_COL_FAMILY
|
|
from advisor.db_timeseries_parser import TimeSeriesData
|
|
from advisor.ini_parser import IniParser
|
|
|
|
|
|
class Section(ABC):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
@abstractmethod
|
|
def set_parameter(self, key, value):
|
|
pass
|
|
|
|
@abstractmethod
|
|
def perform_checks(self):
|
|
pass
|
|
|
|
|
|
class Rule(Section):
|
|
def __init__(self, name):
|
|
super().__init__(name)
|
|
self.conditions = None
|
|
self.suggestions = None
|
|
self.overlap_time_seconds = None
|
|
self.trigger_entities = None
|
|
self.trigger_column_families = None
|
|
|
|
def set_parameter(self, key, value):
|
|
# If the Rule is associated with a single suggestion/condition, then
|
|
# value will be a string and not a list. Hence, convert it to a single
|
|
# element list before storing it in self.suggestions or
|
|
# self.conditions.
|
|
if key == "conditions":
|
|
if isinstance(value, str):
|
|
self.conditions = [value]
|
|
else:
|
|
self.conditions = value
|
|
elif key == "suggestions":
|
|
if isinstance(value, str):
|
|
self.suggestions = [value]
|
|
else:
|
|
self.suggestions = value
|
|
elif key == "overlap_time_period":
|
|
self.overlap_time_seconds = value
|
|
|
|
def get_suggestions(self):
|
|
return self.suggestions
|
|
|
|
def perform_checks(self):
|
|
if not self.conditions or len(self.conditions) < 1:
|
|
raise ValueError(self.name + ": rule must have at least one condition")
|
|
if not self.suggestions or len(self.suggestions) < 1:
|
|
raise ValueError(self.name + ": rule must have at least one suggestion")
|
|
if self.overlap_time_seconds:
|
|
if len(self.conditions) != 2:
|
|
raise ValueError(
|
|
self.name
|
|
+ ": rule must be associated with 2 conditions\
|
|
in order to check for a time dependency between them"
|
|
)
|
|
time_format = "^\d+[s|m|h|d]$" # noqa
|
|
if not re.match(time_format, self.overlap_time_seconds, re.IGNORECASE):
|
|
raise ValueError(
|
|
self.name + ": overlap_time_seconds format: \d+[s|m|h|d]"
|
|
)
|
|
else: # convert to seconds
|
|
in_seconds = int(self.overlap_time_seconds[:-1])
|
|
if self.overlap_time_seconds[-1] == "m":
|
|
in_seconds *= 60
|
|
elif self.overlap_time_seconds[-1] == "h":
|
|
in_seconds *= 60 * 60
|
|
elif self.overlap_time_seconds[-1] == "d":
|
|
in_seconds *= 24 * 60 * 60
|
|
self.overlap_time_seconds = in_seconds
|
|
|
|
def get_overlap_timestamps(self, key1_trigger_epochs, key2_trigger_epochs):
|
|
# this method takes in 2 timeseries i.e. timestamps at which the
|
|
# rule's 2 TIME_SERIES conditions were triggered and it finds
|
|
# (if present) the first pair of timestamps at which the 2 conditions
|
|
# were triggered within 'overlap_time_seconds' of each other
|
|
key1_lower_bounds = [
|
|
epoch - self.overlap_time_seconds for epoch in key1_trigger_epochs
|
|
]
|
|
key1_lower_bounds.sort()
|
|
key2_trigger_epochs.sort()
|
|
trigger_ix = 0
|
|
overlap_pair = None
|
|
for key1_lb in key1_lower_bounds:
|
|
while key2_trigger_epochs[trigger_ix] < key1_lb and trigger_ix < len(
|
|
key2_trigger_epochs
|
|
):
|
|
trigger_ix += 1
|
|
if trigger_ix >= len(key2_trigger_epochs):
|
|
break
|
|
if key2_trigger_epochs[trigger_ix] <= key1_lb + (
|
|
2 * self.overlap_time_seconds
|
|
):
|
|
overlap_pair = (
|
|
key2_trigger_epochs[trigger_ix],
|
|
key1_lb + self.overlap_time_seconds,
|
|
)
|
|
break
|
|
return overlap_pair
|
|
|
|
def get_trigger_entities(self):
|
|
return self.trigger_entities
|
|
|
|
def get_trigger_column_families(self):
|
|
return self.trigger_column_families
|
|
|
|
def is_triggered(self, conditions_dict, column_families):
|
|
if self.overlap_time_seconds:
|
|
condition1 = conditions_dict[self.conditions[0]]
|
|
condition2 = conditions_dict[self.conditions[1]]
|
|
if not (
|
|
condition1.get_data_source() is DataSource.Type.TIME_SERIES
|
|
and condition2.get_data_source() is DataSource.Type.TIME_SERIES
|
|
):
|
|
raise ValueError(self.name + ": need 2 timeseries conditions")
|
|
|
|
map1 = condition1.get_trigger()
|
|
map2 = condition2.get_trigger()
|
|
if not (map1 and map2):
|
|
return False
|
|
|
|
self.trigger_entities = {}
|
|
is_triggered = False
|
|
entity_intersection = set(map1.keys()).intersection(set(map2.keys()))
|
|
for entity in entity_intersection:
|
|
overlap_timestamps_pair = self.get_overlap_timestamps(
|
|
list(map1[entity].keys()), list(map2[entity].keys())
|
|
)
|
|
if overlap_timestamps_pair:
|
|
self.trigger_entities[entity] = overlap_timestamps_pair
|
|
is_triggered = True
|
|
if is_triggered:
|
|
self.trigger_column_families = set(column_families)
|
|
return is_triggered
|
|
else:
|
|
all_conditions_triggered = True
|
|
self.trigger_column_families = set(column_families)
|
|
for cond_name in self.conditions:
|
|
cond = conditions_dict[cond_name]
|
|
if not cond.get_trigger():
|
|
all_conditions_triggered = False
|
|
break
|
|
if (
|
|
cond.get_data_source() is DataSource.Type.LOG
|
|
or cond.get_data_source() is DataSource.Type.DB_OPTIONS
|
|
):
|
|
cond_col_fam = set(cond.get_trigger().keys())
|
|
if NO_COL_FAMILY in cond_col_fam:
|
|
cond_col_fam = set(column_families)
|
|
self.trigger_column_families = (
|
|
self.trigger_column_families.intersection(cond_col_fam)
|
|
)
|
|
elif cond.get_data_source() is DataSource.Type.TIME_SERIES:
|
|
cond_entities = set(cond.get_trigger().keys())
|
|
if self.trigger_entities is None:
|
|
self.trigger_entities = cond_entities
|
|
else:
|
|
self.trigger_entities = self.trigger_entities.intersection(
|
|
cond_entities
|
|
)
|
|
if not (self.trigger_entities or self.trigger_column_families):
|
|
all_conditions_triggered = False
|
|
break
|
|
if not all_conditions_triggered: # clean up if rule not triggered
|
|
self.trigger_column_families = None
|
|
self.trigger_entities = None
|
|
return all_conditions_triggered
|
|
|
|
def __repr__(self):
|
|
# Append conditions
|
|
rule_string = "Rule: " + self.name + " has conditions:: "
|
|
is_first = True
|
|
for cond in self.conditions:
|
|
if is_first:
|
|
rule_string += cond
|
|
is_first = False
|
|
else:
|
|
rule_string += " AND " + cond
|
|
# Append suggestions
|
|
rule_string += "\nsuggestions:: "
|
|
is_first = True
|
|
for sugg in self.suggestions:
|
|
if is_first:
|
|
rule_string += sugg
|
|
is_first = False
|
|
else:
|
|
rule_string += ", " + sugg
|
|
if self.trigger_entities:
|
|
rule_string += ", entities:: " + str(self.trigger_entities)
|
|
if self.trigger_column_families:
|
|
rule_string += ", col_fam:: " + str(self.trigger_column_families)
|
|
# Return constructed string
|
|
return rule_string
|
|
|
|
|
|
class Suggestion(Section):
|
|
class Action(Enum):
|
|
set = 1
|
|
increase = 2
|
|
decrease = 3
|
|
|
|
def __init__(self, name):
|
|
super().__init__(name)
|
|
self.option = None
|
|
self.action = None
|
|
self.suggested_values = None
|
|
self.description = None
|
|
|
|
def set_parameter(self, key, value):
|
|
if key == "option":
|
|
# Note:
|
|
# case 1: 'option' is supported by Rocksdb OPTIONS file; in this
|
|
# case the option belongs to one of the sections in the config
|
|
# file and it's name is prefixed by "<section_type>."
|
|
# case 2: 'option' is not supported by Rocksdb OPTIONS file; the
|
|
# option is not expected to have the character '.' in its name
|
|
self.option = value
|
|
elif key == "action":
|
|
if self.option and not value:
|
|
raise ValueError(self.name + ": provide action for option")
|
|
self.action = self.Action[value]
|
|
elif key == "suggested_values":
|
|
if isinstance(value, str):
|
|
self.suggested_values = [value]
|
|
else:
|
|
self.suggested_values = value
|
|
elif key == "description":
|
|
self.description = value
|
|
|
|
def perform_checks(self):
|
|
if not self.description:
|
|
if not self.option:
|
|
raise ValueError(self.name + ": provide option or description")
|
|
if not self.action:
|
|
raise ValueError(self.name + ": provide action for option")
|
|
if self.action is self.Action.set and not self.suggested_values:
|
|
raise ValueError(self.name + ": provide suggested value for option")
|
|
|
|
def __repr__(self):
|
|
sugg_string = "Suggestion: " + self.name
|
|
if self.description:
|
|
sugg_string += " description : " + self.description
|
|
else:
|
|
sugg_string += " option : " + self.option + " action : " + self.action.name
|
|
if self.suggested_values:
|
|
sugg_string += " suggested_values : " + str(self.suggested_values)
|
|
return sugg_string
|
|
|
|
|
|
class Condition(Section):
|
|
def __init__(self, name):
|
|
super().__init__(name)
|
|
self.data_source = None
|
|
self.trigger = None
|
|
|
|
def perform_checks(self):
|
|
if not self.data_source:
|
|
raise ValueError(self.name + ": condition not tied to data source")
|
|
|
|
def set_data_source(self, data_source):
|
|
self.data_source = data_source
|
|
|
|
def get_data_source(self):
|
|
return self.data_source
|
|
|
|
def reset_trigger(self):
|
|
self.trigger = None
|
|
|
|
def set_trigger(self, condition_trigger):
|
|
self.trigger = condition_trigger
|
|
|
|
def get_trigger(self):
|
|
return self.trigger
|
|
|
|
def is_triggered(self):
|
|
if self.trigger:
|
|
return True
|
|
return False
|
|
|
|
def set_parameter(self, key, value):
|
|
# must be defined by the subclass
|
|
raise NotImplementedError(self.name + ": provide source for condition")
|
|
|
|
|
|
class LogCondition(Condition):
|
|
@classmethod
|
|
def create(cls, base_condition):
|
|
base_condition.set_data_source(DataSource.Type["LOG"])
|
|
base_condition.__class__ = cls
|
|
return base_condition
|
|
|
|
def set_parameter(self, key, value):
|
|
if key == "regex":
|
|
self.regex = value
|
|
|
|
def perform_checks(self):
|
|
super().perform_checks()
|
|
if not self.regex:
|
|
raise ValueError(self.name + ": provide regex for log condition")
|
|
|
|
def __repr__(self):
|
|
log_cond_str = "LogCondition: " + self.name
|
|
log_cond_str += " regex: " + self.regex
|
|
# if self.trigger:
|
|
# log_cond_str += (" trigger: " + str(self.trigger))
|
|
return log_cond_str
|
|
|
|
|
|
class OptionCondition(Condition):
|
|
@classmethod
|
|
def create(cls, base_condition):
|
|
base_condition.set_data_source(DataSource.Type["DB_OPTIONS"])
|
|
base_condition.__class__ = cls
|
|
return base_condition
|
|
|
|
def set_parameter(self, key, value):
|
|
if key == "options":
|
|
if isinstance(value, str):
|
|
self.options = [value]
|
|
else:
|
|
self.options = value
|
|
elif key == "evaluate":
|
|
self.eval_expr = value
|
|
|
|
def perform_checks(self):
|
|
super().perform_checks()
|
|
if not self.options:
|
|
raise ValueError(self.name + ": options missing in condition")
|
|
if not self.eval_expr:
|
|
raise ValueError(self.name + ": expression missing in condition")
|
|
|
|
def __repr__(self):
|
|
opt_cond_str = "OptionCondition: " + self.name
|
|
opt_cond_str += " options: " + str(self.options)
|
|
opt_cond_str += " expression: " + self.eval_expr
|
|
if self.trigger:
|
|
opt_cond_str += " trigger: " + str(self.trigger)
|
|
return opt_cond_str
|
|
|
|
|
|
class TimeSeriesCondition(Condition):
|
|
@classmethod
|
|
def create(cls, base_condition):
|
|
base_condition.set_data_source(DataSource.Type["TIME_SERIES"])
|
|
base_condition.__class__ = cls
|
|
return base_condition
|
|
|
|
def set_parameter(self, key, value):
|
|
if key == "keys":
|
|
if isinstance(value, str):
|
|
self.keys = [value]
|
|
else:
|
|
self.keys = value
|
|
elif key == "behavior":
|
|
self.behavior = TimeSeriesData.Behavior[value]
|
|
elif key == "rate_threshold":
|
|
self.rate_threshold = float(value)
|
|
elif key == "window_sec":
|
|
self.window_sec = int(value)
|
|
elif key == "evaluate":
|
|
self.expression = value
|
|
elif key == "aggregation_op":
|
|
self.aggregation_op = TimeSeriesData.AggregationOperator[value]
|
|
|
|
def perform_checks(self):
|
|
if not self.keys:
|
|
raise ValueError(self.name + ": specify timeseries key")
|
|
if not self.behavior:
|
|
raise ValueError(self.name + ": specify triggering behavior")
|
|
if self.behavior is TimeSeriesData.Behavior.bursty:
|
|
if not self.rate_threshold:
|
|
raise ValueError(self.name + ": specify rate burst threshold")
|
|
if not self.window_sec:
|
|
self.window_sec = 300 # default window length is 5 minutes
|
|
if len(self.keys) > 1:
|
|
raise ValueError(self.name + ": specify only one key")
|
|
elif self.behavior is TimeSeriesData.Behavior.evaluate_expression:
|
|
if not (self.expression):
|
|
raise ValueError(self.name + ": specify evaluation expression")
|
|
else:
|
|
raise ValueError(self.name + ": trigger behavior not supported")
|
|
|
|
def __repr__(self):
|
|
ts_cond_str = "TimeSeriesCondition: " + self.name
|
|
ts_cond_str += " statistics: " + str(self.keys)
|
|
ts_cond_str += " behavior: " + self.behavior.name
|
|
if self.behavior is TimeSeriesData.Behavior.bursty:
|
|
ts_cond_str += " rate_threshold: " + str(self.rate_threshold)
|
|
ts_cond_str += " window_sec: " + str(self.window_sec)
|
|
if self.behavior is TimeSeriesData.Behavior.evaluate_expression:
|
|
ts_cond_str += " expression: " + self.expression
|
|
if hasattr(self, "aggregation_op"):
|
|
ts_cond_str += " aggregation_op: " + self.aggregation_op.name
|
|
if self.trigger:
|
|
ts_cond_str += " trigger: " + str(self.trigger)
|
|
return ts_cond_str
|
|
|
|
|
|
class RulesSpec:
|
|
def __init__(self, rules_path):
|
|
self.file_path = rules_path
|
|
|
|
def initialise_fields(self):
|
|
self.rules_dict = {}
|
|
self.conditions_dict = {}
|
|
self.suggestions_dict = {}
|
|
|
|
def perform_section_checks(self):
|
|
for rule in self.rules_dict.values():
|
|
rule.perform_checks()
|
|
for cond in self.conditions_dict.values():
|
|
cond.perform_checks()
|
|
for sugg in self.suggestions_dict.values():
|
|
sugg.perform_checks()
|
|
|
|
def load_rules_from_spec(self):
|
|
self.initialise_fields()
|
|
with open(self.file_path, "r") as db_rules:
|
|
curr_section = None
|
|
for line in db_rules:
|
|
line = IniParser.remove_trailing_comment(line)
|
|
if not line:
|
|
continue
|
|
element = IniParser.get_element(line)
|
|
if element is IniParser.Element.comment:
|
|
continue
|
|
elif element is not IniParser.Element.key_val:
|
|
curr_section = element # it's a new IniParser header
|
|
section_name = IniParser.get_section_name(line)
|
|
if element is IniParser.Element.rule:
|
|
new_rule = Rule(section_name)
|
|
self.rules_dict[section_name] = new_rule
|
|
elif element is IniParser.Element.cond:
|
|
new_cond = Condition(section_name)
|
|
self.conditions_dict[section_name] = new_cond
|
|
elif element is IniParser.Element.sugg:
|
|
new_suggestion = Suggestion(section_name)
|
|
self.suggestions_dict[section_name] = new_suggestion
|
|
elif element is IniParser.Element.key_val:
|
|
key, value = IniParser.get_key_value_pair(line)
|
|
if curr_section is IniParser.Element.rule:
|
|
new_rule.set_parameter(key, value)
|
|
elif curr_section is IniParser.Element.cond:
|
|
if key == "source":
|
|
if value == "LOG":
|
|
new_cond = LogCondition.create(new_cond)
|
|
elif value == "OPTIONS":
|
|
new_cond = OptionCondition.create(new_cond)
|
|
elif value == "TIME_SERIES":
|
|
new_cond = TimeSeriesCondition.create(new_cond)
|
|
else:
|
|
new_cond.set_parameter(key, value)
|
|
elif curr_section is IniParser.Element.sugg:
|
|
new_suggestion.set_parameter(key, value)
|
|
|
|
def get_rules_dict(self):
|
|
return self.rules_dict
|
|
|
|
def get_conditions_dict(self):
|
|
return self.conditions_dict
|
|
|
|
def get_suggestions_dict(self):
|
|
return self.suggestions_dict
|
|
|
|
def get_triggered_rules(self, data_sources, column_families):
|
|
self.trigger_conditions(data_sources)
|
|
triggered_rules = []
|
|
for rule in self.rules_dict.values():
|
|
if rule.is_triggered(self.conditions_dict, column_families):
|
|
triggered_rules.append(rule)
|
|
return triggered_rules
|
|
|
|
def trigger_conditions(self, data_sources):
|
|
for source_type in data_sources:
|
|
cond_subset = [
|
|
cond
|
|
for cond in self.conditions_dict.values()
|
|
if cond.get_data_source() is source_type
|
|
]
|
|
if not cond_subset:
|
|
continue
|
|
for source in data_sources[source_type]:
|
|
source.check_and_trigger_conditions(cond_subset)
|
|
|
|
def print_rules(self, rules):
|
|
for rule in rules:
|
|
print("\nRule: " + rule.name)
|
|
for cond_name in rule.conditions:
|
|
print(repr(self.conditions_dict[cond_name]))
|
|
for sugg_name in rule.suggestions:
|
|
print(repr(self.suggestions_dict[sugg_name]))
|
|
if rule.trigger_entities:
|
|
print("scope: entities:")
|
|
print(rule.trigger_entities)
|
|
if rule.trigger_column_families:
|
|
print("scope: col_fam:")
|
|
print(rule.trigger_column_families)
|
|
|