diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index 201c47f3..67de2aac 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -19,6 +19,7 @@ use regex::{Regex, RegexBuilder}; use sha1::Sha1; use sha2::{Sha256, Sha384, Sha512}; use spargebra::algebra::GraphPattern; +use std::cell::Cell; use std::cmp::Ordering; use std::collections::hash_map::DefaultHasher; use std::collections::{HashMap, HashSet}; @@ -27,6 +28,7 @@ use std::iter::Iterator; use std::iter::{empty, once}; use std::rc::Rc; use std::str; +use std::time::Instant; const REGEX_SIZE_LIMIT: usize = 1_000_000; @@ -108,7 +110,29 @@ impl SimpleEvaluator { &self, node: Rc, ) -> Rc EncodedTuplesIterator> { - match node.as_ref() { + self.build_plan_evaluator(node, false).0 + } + + pub fn plan_evaluator_with_stats( + &self, + node: Rc, + ) -> ( + Rc EncodedTuplesIterator>, + Rc, + ) { + self.build_plan_evaluator(node, true) + } + + fn build_plan_evaluator( + &self, + node: Rc, + run_stats: bool, + ) -> ( + Rc EncodedTuplesIterator>, + Rc, + ) { + let mut stat_children = Vec::new(); + let mut evaluator: Rc EncodedTuplesIterator> = match node.as_ref() { PlanNode::StaticBindings { encoded_tuples, .. } => { let tuples = encoded_tuples.clone(); Rc::new(move |from| { @@ -348,8 +372,10 @@ impl SimpleEvaluator { .intersection(&right.always_bound_variables()) .copied() .collect(); - let left = self.plan_evaluator(left.clone()); - let right = self.plan_evaluator(right.clone()); + let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); + stat_children.push(left_stats); + let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); + stat_children.push(right_stats); if join_keys.is_empty() { // Cartesian product Rc::new(move |from| { @@ -392,8 +418,10 @@ impl SimpleEvaluator { } } PlanNode::ForLoopJoin { left, right } => { - let left = self.plan_evaluator(left.clone()); - let right = self.plan_evaluator(right.clone()); + let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); + stat_children.push(left_stats); + let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); + stat_children.push(right_stats); Rc::new(move |from| { let right = right.clone(); Box::new(left(from).flat_map(move |t| match t { @@ -408,8 +436,10 @@ impl SimpleEvaluator { .intersection(&right.always_bound_variables()) .copied() .collect(); - let left = self.plan_evaluator(left.clone()); - let right = self.plan_evaluator(right.clone()); + let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); + stat_children.push(left_stats); + let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); + stat_children.push(right_stats); if join_keys.is_empty() { Rc::new(move |from| { let right: Vec<_> = right(from.clone()).filter_map(Result::ok).collect(); @@ -449,8 +479,10 @@ impl SimpleEvaluator { .intersection(&right.always_bound_variables()) .copied() .collect(); - let left = self.plan_evaluator(left.clone()); - let right = self.plan_evaluator(right.clone()); + let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); + stat_children.push(left_stats); + let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); + stat_children.push(right_stats); let expression = self.expression_evaluator(expression); // Real hash join Rc::new(move |from| { @@ -476,8 +508,10 @@ impl SimpleEvaluator { right, possible_problem_vars, } => { - let left = self.plan_evaluator(left.clone()); - let right = self.plan_evaluator(right.clone()); + let (left, left_stats) = self.build_plan_evaluator(left.clone(), run_stats); + stat_children.push(left_stats); + let (right, right_stats) = self.build_plan_evaluator(right.clone(), run_stats); + stat_children.push(right_stats); let possible_problem_vars = possible_problem_vars.clone(); Rc::new(move |from| { if possible_problem_vars.is_empty() { @@ -499,7 +533,8 @@ impl SimpleEvaluator { }) } PlanNode::Filter { child, expression } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let expression = self.expression_evaluator(expression); Rc::new(move |from| { let expression = expression.clone(); @@ -516,7 +551,12 @@ impl SimpleEvaluator { PlanNode::Union { children } => { let children: Vec<_> = children .iter() - .map(|child| self.plan_evaluator(child.clone())) + .map(|child| { + let (child, child_stats) = + self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); + child + }) .collect(); Rc::new(move |from| { Box::new(UnionIterator { @@ -532,7 +572,8 @@ impl SimpleEvaluator { variable, expression, } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let position = variable.encoded; let expression = self.expression_evaluator(expression); Rc::new(move |from| { @@ -547,7 +588,8 @@ impl SimpleEvaluator { }) } PlanNode::Sort { child, by } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let by: Vec<_> = by .iter() .map(|comp| match comp { @@ -604,11 +646,13 @@ impl SimpleEvaluator { }) } PlanNode::HashDeduplicate { child } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); Rc::new(move |from| Box::new(hash_deduplicate(child(from)))) } PlanNode::Reduced { child } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); Rc::new(move |from| { Box::new(ConsecutiveDeduplication { inner: child(from), @@ -617,17 +661,20 @@ impl SimpleEvaluator { }) } PlanNode::Skip { child, count } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let count = *count; Rc::new(move |from| Box::new(child(from).skip(count))) } PlanNode::Limit { child, count } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let count = *count; Rc::new(move |from| Box::new(child(from).take(count))) } PlanNode::Project { child, mapping } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let mapping = mapping.clone(); Rc::new(move |from| { let mapping = mapping.clone(); @@ -666,7 +713,8 @@ impl SimpleEvaluator { key_variables, aggregates, } => { - let child = self.plan_evaluator(child.clone()); + let (child, child_stats) = self.build_plan_evaluator(child.clone(), run_stats); + stat_children.push(child_stats); let key_variables = key_variables.clone(); let aggregate_input_expressions: Vec<_> = aggregates .iter() @@ -755,7 +803,28 @@ impl SimpleEvaluator { ) }) } + }; + let stats = Rc::new(PlanNodeWithStats { + node, + children: stat_children, + exec_count: Cell::new(0), + exec_duration: Cell::new(std::time::Duration::from_secs(0)), + }); + if run_stats { + let stats = stats.clone(); + evaluator = Rc::new(move |tuple| { + let start = Instant::now(); + let inner = evaluator(tuple); + stats + .exec_duration + .set(stats.exec_duration.get() + start.elapsed()); + Box::new(StatsIterator { + inner, + stats: stats.clone(), + }) + }) } + (evaluator, stats) } fn evaluate_service( @@ -4697,6 +4766,27 @@ impl Extend for EncodedTupleSet { } } +struct StatsIterator { + inner: EncodedTuplesIterator, + stats: Rc, +} + +impl Iterator for StatsIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + let start = Instant::now(); + let result = self.inner.next(); + self.stats + .exec_duration + .set(self.stats.exec_duration.get() + start.elapsed()); + if matches!(result, Some(Ok(_))) { + self.stats.exec_count.set(self.stats.exec_count.get() + 1); + } + result + } +} + #[test] fn uuid() { let mut buffer = String::default(); diff --git a/lib/src/sparql/plan.rs b/lib/src/sparql/plan.rs index 6b78645a..9db35492 100644 --- a/lib/src/sparql/plan.rs +++ b/lib/src/sparql/plan.rs @@ -4,10 +4,12 @@ use crate::storage::numeric_encoder::EncodedTerm; use regex::Regex; use spargebra::algebra::GraphPattern; use spargebra::term::GroundTerm; +use std::cell::Cell; use std::cmp::max; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; use std::rc::Rc; +use std::time::Duration; #[derive(Debug)] pub enum PlanNode { @@ -752,3 +754,10 @@ impl IntoIterator for EncodedTuple { self.inner.into_iter() } } + +pub struct PlanNodeWithStats { + pub node: Rc, + pub children: Vec>, + pub exec_count: Cell, + pub exec_duration: Cell, +}