|
|
|
@ -19,6 +19,7 @@ use regex::{Regex, RegexBuilder}; |
|
|
|
|
use sha1::Sha1; |
|
|
|
|
use sha2::{Sha256, Sha384, Sha512}; |
|
|
|
|
use spargebra::algebra::GraphPattern; |
|
|
|
|
use std::cell::Cell; |
|
|
|
|
use std::cmp::Ordering; |
|
|
|
|
use std::collections::hash_map::DefaultHasher; |
|
|
|
|
use std::collections::{HashMap, HashSet}; |
|
|
|
@ -27,6 +28,7 @@ use std::iter::Iterator; |
|
|
|
|
use std::iter::{empty, once}; |
|
|
|
|
use std::rc::Rc; |
|
|
|
|
use std::str; |
|
|
|
|
use std::time::Instant; |
|
|
|
|
|
|
|
|
|
const REGEX_SIZE_LIMIT: usize = 1_000_000; |
|
|
|
|
|
|
|
|
@ -108,7 +110,29 @@ impl SimpleEvaluator { |
|
|
|
|
&self, |
|
|
|
|
node: Rc<PlanNode>, |
|
|
|
|
) -> Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> { |
|
|
|
|
match node.as_ref() { |
|
|
|
|
self.build_plan_evaluator(node, false).0 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub fn plan_evaluator_with_stats( |
|
|
|
|
&self, |
|
|
|
|
node: Rc<PlanNode>, |
|
|
|
|
) -> ( |
|
|
|
|
Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator>, |
|
|
|
|
Rc<PlanNodeWithStats>, |
|
|
|
|
) { |
|
|
|
|
self.build_plan_evaluator(node, true) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn build_plan_evaluator( |
|
|
|
|
&self, |
|
|
|
|
node: Rc<PlanNode>, |
|
|
|
|
run_stats: bool, |
|
|
|
|
) -> ( |
|
|
|
|
Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator>, |
|
|
|
|
Rc<PlanNodeWithStats>, |
|
|
|
|
) { |
|
|
|
|
let mut stat_children = Vec::new(); |
|
|
|
|
let mut evaluator: Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> = match node.as_ref() { |
|
|
|
|
PlanNode::StaticBindings { encoded_tuples, .. } => { |
|
|
|
|
let tuples = encoded_tuples.clone(); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
@ -348,8 +372,10 @@ impl SimpleEvaluator { |
|
|
|
|
.intersection(&right.always_bound_variables()) |
|
|
|
|
.copied() |
|
|
|
|
.collect(); |
|
|
|
|
let left = self.plan_evaluator(left.clone()); |
|
|
|
|
let right = self.plan_evaluator(right.clone()); |
|
|
|
|
let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); |
|
|
|
|
stat_children.push(left_stats); |
|
|
|
|
let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); |
|
|
|
|
stat_children.push(right_stats); |
|
|
|
|
if join_keys.is_empty() { |
|
|
|
|
// Cartesian product
|
|
|
|
|
Rc::new(move |from| { |
|
|
|
@ -392,8 +418,10 @@ impl SimpleEvaluator { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
PlanNode::ForLoopJoin { left, right } => { |
|
|
|
|
let left = self.plan_evaluator(left.clone()); |
|
|
|
|
let right = self.plan_evaluator(right.clone()); |
|
|
|
|
let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); |
|
|
|
|
stat_children.push(left_stats); |
|
|
|
|
let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); |
|
|
|
|
stat_children.push(right_stats); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
let right = right.clone(); |
|
|
|
|
Box::new(left(from).flat_map(move |t| match t { |
|
|
|
@ -408,8 +436,10 @@ impl SimpleEvaluator { |
|
|
|
|
.intersection(&right.always_bound_variables()) |
|
|
|
|
.copied() |
|
|
|
|
.collect(); |
|
|
|
|
let left = self.plan_evaluator(left.clone()); |
|
|
|
|
let right = self.plan_evaluator(right.clone()); |
|
|
|
|
let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); |
|
|
|
|
stat_children.push(left_stats); |
|
|
|
|
let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); |
|
|
|
|
stat_children.push(right_stats); |
|
|
|
|
if join_keys.is_empty() { |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
let right: Vec<_> = right(from.clone()).filter_map(Result::ok).collect(); |
|
|
|
@ -449,8 +479,10 @@ impl SimpleEvaluator { |
|
|
|
|
.intersection(&right.always_bound_variables()) |
|
|
|
|
.copied() |
|
|
|
|
.collect(); |
|
|
|
|
let left = self.plan_evaluator(left.clone()); |
|
|
|
|
let right = self.plan_evaluator(right.clone()); |
|
|
|
|
let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); |
|
|
|
|
stat_children.push(left_stats); |
|
|
|
|
let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); |
|
|
|
|
stat_children.push(right_stats); |
|
|
|
|
let expression = self.expression_evaluator(expression); |
|
|
|
|
// Real hash join
|
|
|
|
|
Rc::new(move |from| { |
|
|
|
@ -476,8 +508,10 @@ impl SimpleEvaluator { |
|
|
|
|
right, |
|
|
|
|
possible_problem_vars, |
|
|
|
|
} => { |
|
|
|
|
let left = self.plan_evaluator(left.clone()); |
|
|
|
|
let right = self.plan_evaluator(right.clone()); |
|
|
|
|
let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); |
|
|
|
|
stat_children.push(left_stats); |
|
|
|
|
let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); |
|
|
|
|
stat_children.push(right_stats); |
|
|
|
|
let possible_problem_vars = possible_problem_vars.clone(); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
if possible_problem_vars.is_empty() { |
|
|
|
@ -499,7 +533,8 @@ impl SimpleEvaluator { |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
PlanNode::Filter { child, expression } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let expression = self.expression_evaluator(expression); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
let expression = expression.clone(); |
|
|
|
@ -516,7 +551,12 @@ impl SimpleEvaluator { |
|
|
|
|
PlanNode::Union { children } => { |
|
|
|
|
let children: Vec<_> = children |
|
|
|
|
.iter() |
|
|
|
|
.map(|child| self.plan_evaluator(child.clone())) |
|
|
|
|
.map(|child| { |
|
|
|
|
let (child, child_stats) = |
|
|
|
|
self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
child |
|
|
|
|
}) |
|
|
|
|
.collect(); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
Box::new(UnionIterator { |
|
|
|
@ -532,7 +572,8 @@ impl SimpleEvaluator { |
|
|
|
|
variable, |
|
|
|
|
expression, |
|
|
|
|
} => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let position = variable.encoded; |
|
|
|
|
let expression = self.expression_evaluator(expression); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
@ -547,7 +588,8 @@ impl SimpleEvaluator { |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
PlanNode::Sort { child, by } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let by: Vec<_> = by |
|
|
|
|
.iter() |
|
|
|
|
.map(|comp| match comp { |
|
|
|
@ -604,11 +646,13 @@ impl SimpleEvaluator { |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
PlanNode::HashDeduplicate { child } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
Rc::new(move |from| Box::new(hash_deduplicate(child(from)))) |
|
|
|
|
} |
|
|
|
|
PlanNode::Reduced { child } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
Box::new(ConsecutiveDeduplication { |
|
|
|
|
inner: child(from), |
|
|
|
@ -617,17 +661,20 @@ impl SimpleEvaluator { |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
PlanNode::Skip { child, count } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let count = *count; |
|
|
|
|
Rc::new(move |from| Box::new(child(from).skip(count))) |
|
|
|
|
} |
|
|
|
|
PlanNode::Limit { child, count } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let count = *count; |
|
|
|
|
Rc::new(move |from| Box::new(child(from).take(count))) |
|
|
|
|
} |
|
|
|
|
PlanNode::Project { child, mapping } => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let mapping = mapping.clone(); |
|
|
|
|
Rc::new(move |from| { |
|
|
|
|
let mapping = mapping.clone(); |
|
|
|
@ -666,7 +713,8 @@ impl SimpleEvaluator { |
|
|
|
|
key_variables, |
|
|
|
|
aggregates, |
|
|
|
|
} => { |
|
|
|
|
let child = self.plan_evaluator(child.clone()); |
|
|
|
|
let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); |
|
|
|
|
stat_children.push(child_stats); |
|
|
|
|
let key_variables = key_variables.clone(); |
|
|
|
|
let aggregate_input_expressions: Vec<_> = aggregates |
|
|
|
|
.iter() |
|
|
|
@ -755,7 +803,28 @@ impl SimpleEvaluator { |
|
|
|
|
) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let stats = Rc::new(PlanNodeWithStats { |
|
|
|
|
node, |
|
|
|
|
children: stat_children, |
|
|
|
|
exec_count: Cell::new(0), |
|
|
|
|
exec_duration: Cell::new(std::time::Duration::from_secs(0)), |
|
|
|
|
}); |
|
|
|
|
if run_stats { |
|
|
|
|
let stats = stats.clone(); |
|
|
|
|
evaluator = Rc::new(move |tuple| { |
|
|
|
|
let start = Instant::now(); |
|
|
|
|
let inner = evaluator(tuple); |
|
|
|
|
stats |
|
|
|
|
.exec_duration |
|
|
|
|
.set(stats.exec_duration.get() + start.elapsed()); |
|
|
|
|
Box::new(StatsIterator { |
|
|
|
|
inner, |
|
|
|
|
stats: stats.clone(), |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
(evaluator, stats) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn evaluate_service( |
|
|
|
@ -4697,6 +4766,27 @@ impl Extend<EncodedTuple> for EncodedTupleSet { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct StatsIterator { |
|
|
|
|
inner: EncodedTuplesIterator, |
|
|
|
|
stats: Rc<PlanNodeWithStats>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl Iterator for StatsIterator { |
|
|
|
|
type Item = Result<EncodedTuple, EvaluationError>; |
|
|
|
|
|
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTuple, EvaluationError>> { |
|
|
|
|
let start = Instant::now(); |
|
|
|
|
let result = self.inner.next(); |
|
|
|
|
self.stats |
|
|
|
|
.exec_duration |
|
|
|
|
.set(self.stats.exec_duration.get() + start.elapsed()); |
|
|
|
|
if matches!(result, Some(Ok(_))) { |
|
|
|
|
self.stats.exec_count.set(self.stats.exec_count.get() + 1); |
|
|
|
|
} |
|
|
|
|
result |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
|
fn uuid() { |
|
|
|
|
let mut buffer = String::default(); |
|
|
|
|