diff --git a/lib/src/sparql/eval.rs b/lib/src/sparql/eval.rs index 8821fb1e..8ab0d8ba 100644 --- a/lib/src/sparql/eval.rs +++ b/lib/src/sparql/eval.rs @@ -9,6 +9,7 @@ use crate::sparql::service::ServiceHandler; use crate::storage::numeric_encoder::*; use crate::storage::small_string::SmallString; use digest::Digest; +use json_event_parser::{JsonEvent, JsonWriter}; use md5::Md5; use oxilangtag::LanguageTag; use oxiri::Iri; @@ -27,10 +28,10 @@ use std::hash::{Hash, Hasher}; use std::iter::Iterator; use std::iter::{empty, once}; use std::rc::Rc; -use std::str; use std::time::Duration as StdDuration; #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] use std::time::Instant; +use std::{fmt, io, str}; const REGEX_SIZE_LIMIT: usize = 1_000_000; @@ -68,9 +69,9 @@ impl SimpleEvaluator { #[allow(clippy::rc_buffer)] pub fn evaluate_select_plan( &self, - plan: Rc, + plan: &PlanNode, variables: Rc>, - ) -> (QueryResults, Rc) { + ) -> (QueryResults, Rc) { let (eval, stats) = self.plan_evaluator(plan); ( QueryResults::Solutions(decode_bindings( @@ -84,8 +85,8 @@ impl SimpleEvaluator { pub fn evaluate_ask_plan( &self, - plan: Rc, - ) -> (Result, Rc) { + plan: &PlanNode, + ) -> (Result, Rc) { let from = EncodedTuple::with_capacity(plan.used_variables().len()); let (eval, stats) = self.plan_evaluator(plan); ( @@ -100,9 +101,9 @@ impl SimpleEvaluator { pub fn evaluate_construct_plan( &self, - plan: Rc, + plan: &PlanNode, template: Vec, - ) -> (QueryResults, Rc) { + ) -> (QueryResults, Rc) { let from = EncodedTuple::with_capacity(plan.used_variables().len()); let (eval, stats) = self.plan_evaluator(plan); ( @@ -119,10 +120,7 @@ impl SimpleEvaluator { ) } - pub fn evaluate_describe_plan( - &self, - plan: Rc, - ) -> (QueryResults, Rc) { + pub fn evaluate_describe_plan(&self, plan: &PlanNode) -> (QueryResults, Rc) { let from = EncodedTuple::with_capacity(plan.used_variables().len()); let (eval, stats) = self.plan_evaluator(plan); ( @@ -139,13 +137,13 @@ impl SimpleEvaluator { pub fn plan_evaluator( &self, - node: Rc, + node: &PlanNode, ) -> ( Rc EncodedTuplesIterator>, - Rc, + Rc, ) { let mut stat_children = Vec::new(); - let mut evaluator: Rc EncodedTuplesIterator> = match node.as_ref() { + let mut evaluator: Rc EncodedTuplesIterator> = match node { PlanNode::StaticBindings { encoded_tuples, .. } => { let tuples = encoded_tuples.clone(); Rc::new(move |from| { @@ -388,9 +386,9 @@ impl SimpleEvaluator { .intersection(&build_child.always_bound_variables()) .copied() .collect(); - let (probe, probe_stats) = self.plan_evaluator(Rc::clone(probe_child)); + let (probe, probe_stats) = self.plan_evaluator(probe_child); stat_children.push(probe_stats); - let (build, build_stats) = self.plan_evaluator(Rc::clone(build_child)); + let (build, build_stats) = self.plan_evaluator(build_child); stat_children.push(build_stats); if join_keys.is_empty() { // Cartesian product @@ -434,9 +432,9 @@ impl SimpleEvaluator { } } PlanNode::ForLoopJoin { left, right } => { - let (left, left_stats) = self.plan_evaluator(Rc::clone(left)); + let (left, left_stats) = self.plan_evaluator(left); stat_children.push(left_stats); - let (right, right_stats) = self.plan_evaluator(Rc::clone(right)); + let (right, right_stats) = self.plan_evaluator(right); stat_children.push(right_stats); Rc::new(move |from| { let right = Rc::clone(&right); @@ -452,9 +450,9 @@ impl SimpleEvaluator { .intersection(&right.always_bound_variables()) .copied() .collect(); - let (left, left_stats) = self.plan_evaluator(Rc::clone(left)); + let (left, left_stats) = self.plan_evaluator(left); stat_children.push(left_stats); - let (right, right_stats) = self.plan_evaluator(Rc::clone(right)); + let (right, right_stats) = self.plan_evaluator(right); stat_children.push(right_stats); if join_keys.is_empty() { Rc::new(move |from| { @@ -495,9 +493,9 @@ impl SimpleEvaluator { .intersection(&right.always_bound_variables()) .copied() .collect(); - let (left, left_stats) = self.plan_evaluator(Rc::clone(left)); + let (left, left_stats) = self.plan_evaluator(left); stat_children.push(left_stats); - let (right, right_stats) = self.plan_evaluator(Rc::clone(right)); + let (right, right_stats) = self.plan_evaluator(right); stat_children.push(right_stats); let expression = self.expression_evaluator(expression, &mut stat_children); // Real hash join @@ -520,9 +518,9 @@ impl SimpleEvaluator { }) } PlanNode::ForLoopLeftJoin { left, right } => { - let (left, left_stats) = self.plan_evaluator(Rc::clone(left)); + let (left, left_stats) = self.plan_evaluator(left); stat_children.push(left_stats); - let (right, right_stats) = self.plan_evaluator(Rc::clone(right)); + let (right, right_stats) = self.plan_evaluator(right); stat_children.push(right_stats); Rc::new(move |from| { Box::new(ForLoopLeftJoinIterator { @@ -533,7 +531,7 @@ impl SimpleEvaluator { }) } PlanNode::Filter { child, expression } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let expression = self.expression_evaluator(expression, &mut stat_children); Rc::new(move |from| { @@ -552,7 +550,7 @@ impl SimpleEvaluator { let children: Vec<_> = children .iter() .map(|child| { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); child }) @@ -571,7 +569,7 @@ impl SimpleEvaluator { variable, expression, } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let position = variable.encoded; let expression = self.expression_evaluator(expression, &mut stat_children); @@ -587,7 +585,7 @@ impl SimpleEvaluator { }) } PlanNode::Sort { child, by } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let by: Vec<_> = by .iter() @@ -645,12 +643,12 @@ impl SimpleEvaluator { }) } PlanNode::HashDeduplicate { child } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); Rc::new(move |from| Box::new(hash_deduplicate(child(from)))) } PlanNode::Reduced { child } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); Rc::new(move |from| { Box::new(ConsecutiveDeduplication { @@ -660,19 +658,19 @@ impl SimpleEvaluator { }) } PlanNode::Skip { child, count } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let count = *count; Rc::new(move |from| Box::new(child(from).skip(count))) } PlanNode::Limit { child, count } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let count = *count; Rc::new(move |from| Box::new(child(from).take(count))) } PlanNode::Project { child, mapping } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let mapping = Rc::clone(mapping); Rc::new(move |from| { @@ -712,7 +710,7 @@ impl SimpleEvaluator { key_variables, aggregates, } => { - let (child, child_stats) = self.plan_evaluator(Rc::clone(child)); + let (child, child_stats) = self.plan_evaluator(child); stat_children.push(child_stats); let key_variables = Rc::clone(key_variables); let aggregate_input_expressions: Vec<_> = aggregates @@ -806,11 +804,11 @@ impl SimpleEvaluator { }) } }; - let stats = Rc::new(PlanNodeWithStats { - node, + let stats = Rc::new(EvalNodeWithStats { + label: eval_node_label(node), children: stat_children, exec_count: Cell::new(0), - exec_duration: Cell::new(std::time::Duration::from_secs(0)), + exec_duration: Cell::new(StdDuration::from_secs(0)), }); if self.run_stats { let stats = Rc::clone(&stats); @@ -920,7 +918,7 @@ impl SimpleEvaluator { fn expression_evaluator( &self, expression: &PlanExpression, - stat_children: &mut Vec>, + stat_children: &mut Vec>, ) -> Rc Option> { match expression { PlanExpression::NamedNode(t) => { @@ -936,7 +934,7 @@ impl SimpleEvaluator { Rc::new(move |tuple| tuple.get(v).cloned()) } PlanExpression::Exists(plan) => { - let (eval, stats) = self.plan_evaluator(Rc::clone(plan)); + let (eval, stats) = self.plan_evaluator(plan); stat_children.push(stats); Rc::new(move |tuple| Some(eval(tuple.clone()).next().is_some().into())) } @@ -2086,7 +2084,7 @@ impl SimpleEvaluator { fn hash( &self, arg: &PlanExpression, - stat_children: &mut Vec>, + stat_children: &mut Vec>, ) -> Rc Option> { let arg = self.expression_evaluator(arg, stat_children); let dataset = Rc::clone(&self.dataset); @@ -4719,7 +4717,7 @@ impl Extend for EncodedTupleSet { struct StatsIterator { inner: EncodedTuplesIterator, - stats: Rc, + stats: Rc, } impl Iterator for StatsIterator { @@ -4738,6 +4736,145 @@ impl Iterator for StatsIterator { } } +pub struct EvalNodeWithStats { + pub label: String, + pub children: Vec>, + pub exec_count: Cell, + pub exec_duration: Cell, +} + +impl EvalNodeWithStats { + pub fn json_node( + &self, + writer: &mut JsonWriter, + with_stats: bool, + ) -> io::Result<()> { + writer.write_event(JsonEvent::StartObject)?; + writer.write_event(JsonEvent::ObjectKey("name"))?; + writer.write_event(JsonEvent::String(&self.label))?; + if with_stats { + writer.write_event(JsonEvent::ObjectKey("number of results"))?; + writer.write_event(JsonEvent::Number(&self.exec_count.get().to_string()))?; + writer.write_event(JsonEvent::ObjectKey("duration in seconds"))?; + writer.write_event(JsonEvent::Number( + &self.exec_duration.get().as_secs_f32().to_string(), + ))?; + } + writer.write_event(JsonEvent::ObjectKey("children"))?; + writer.write_event(JsonEvent::StartArray)?; + for child in &self.children { + child.json_node(writer, with_stats)?; + } + writer.write_event(JsonEvent::EndArray)?; + writer.write_event(JsonEvent::EndObject) + } +} + +impl fmt::Debug for EvalNodeWithStats { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut obj = f.debug_struct("Node"); + obj.field("name", &self.label); + if self.exec_duration.get() > StdDuration::default() { + obj.field("number of results", &self.exec_count.get()); + obj.field("duration in seconds", &self.exec_duration.get()); + } + if !self.children.is_empty() { + obj.field("children", &self.children); + } + obj.finish() + } +} + +fn eval_node_label(node: &PlanNode) -> String { + match node { + PlanNode::Aggregate { + key_variables, + aggregates, + .. + } => format!( + "Aggregate({})", + key_variables + .iter() + .map(ToString::to_string) + .chain(aggregates.iter().map(|(agg, v)| format!("{agg} -> {v}"))) + .collect::>() + .join(", ") + ), + PlanNode::AntiJoin { .. } => "AntiJoin".to_owned(), + PlanNode::Extend { + expression, + variable, + .. + } => format!("Extend({expression} -> {variable})"), + PlanNode::Filter { expression, .. } => format!("Filter({expression})"), + PlanNode::ForLoopJoin { .. } => "ForLoopJoin".to_owned(), + PlanNode::ForLoopLeftJoin { .. } => "ForLoopLeftJoin".to_owned(), + PlanNode::HashDeduplicate { .. } => "HashDeduplicate".to_owned(), + PlanNode::HashJoin { .. } => "HashJoin".to_owned(), + PlanNode::HashLeftJoin { expression, .. } => format!("HashLeftJoin({expression})"), + PlanNode::Limit { count, .. } => format!("Limit({count})"), + PlanNode::PathPattern { + subject, + path, + object, + graph_name, + } => format!("PathPattern({subject} {path} {object} {graph_name})"), + PlanNode::Project { mapping, .. } => { + format!( + "Project({})", + mapping + .iter() + .map(|(f, t)| if f.plain == t.plain { + f.to_string() + } else { + format!("{f} -> {t}") + }) + .collect::>() + .join(", ") + ) + } + PlanNode::QuadPattern { + subject, + predicate, + object, + graph_name, + } => format!("QuadPattern({subject} {predicate} {object} {graph_name})"), + PlanNode::Reduced { .. } => "Reduced".to_owned(), + PlanNode::Service { + service_name, + silent, + .. + } => { + if *silent { + format!("SilentService({service_name})") + } else { + format!("Service({service_name})") + } + } + PlanNode::Skip { count, .. } => format!("Skip({count})"), + PlanNode::Sort { by, .. } => { + format!( + "Sort({})", + by.iter() + .map(ToString::to_string) + .collect::>() + .join(", ") + ) + } + PlanNode::StaticBindings { variables, .. } => { + format!( + "StaticBindings({})", + variables + .iter() + .map(ToString::to_string) + .collect::>() + .join(", ") + ) + } + PlanNode::Union { .. } => "Union".to_owned(), + } +} + #[cfg(all(target_family = "wasm", target_os = "unknown"))] pub struct Timer { timestamp_ms: f64, diff --git a/lib/src/sparql/mod.rs b/lib/src/sparql/mod.rs index 9612d514..e66e114b 100644 --- a/lib/src/sparql/mod.rs +++ b/lib/src/sparql/mod.rs @@ -17,9 +17,8 @@ use crate::model::{NamedNode, Term}; pub use crate::sparql::algebra::{Query, QueryDataset, Update}; use crate::sparql::dataset::DatasetView; pub use crate::sparql::error::{EvaluationError, QueryError}; -use crate::sparql::eval::{SimpleEvaluator, Timer}; +use crate::sparql::eval::{EvalNodeWithStats, SimpleEvaluator, Timer}; pub use crate::sparql::model::{QueryResults, QuerySolution, QuerySolutionIter, QueryTripleIter}; -use crate::sparql::plan::PlanNodeWithStats; use crate::sparql::plan_builder::PlanBuilder; pub use crate::sparql::service::ServiceHandler; use crate::sparql::service::{EmptyServiceHandler, ErrorConversionServiceHandler}; @@ -63,7 +62,7 @@ pub(crate) fn evaluate_query( Rc::new(options.custom_functions), run_stats, ) - .evaluate_select_plan(Rc::new(plan), Rc::new(variables)); + .evaluate_select_plan(&plan, Rc::new(variables)); (Ok(results), explanation, planning_duration) } spargebra::Query::Ask { @@ -84,7 +83,7 @@ pub(crate) fn evaluate_query( Rc::new(options.custom_functions), run_stats, ) - .evaluate_ask_plan(Rc::new(plan)); + .evaluate_ask_plan(&plan); (results, explanation, planning_duration) } spargebra::Query::Construct { @@ -114,7 +113,7 @@ pub(crate) fn evaluate_query( Rc::new(options.custom_functions), run_stats, ) - .evaluate_construct_plan(Rc::new(plan), construct); + .evaluate_construct_plan(&plan, construct); (Ok(results), explanation, planning_duration) } spargebra::Query::Describe { @@ -135,7 +134,7 @@ pub(crate) fn evaluate_query( Rc::new(options.custom_functions), run_stats, ) - .evaluate_describe_plan(Rc::new(plan)); + .evaluate_describe_plan(&plan); (Ok(results), explanation, planning_duration) } }; @@ -284,7 +283,7 @@ impl From for UpdateOptions { /// The explanation of a query. #[derive(Clone)] pub struct QueryExplanation { - inner: Rc, + inner: Rc, with_stats: bool, parsing_duration: Option, planning_duration: Duration, diff --git a/lib/src/sparql/plan.rs b/lib/src/sparql/plan.rs index 905e4da0..a447fb01 100644 --- a/lib/src/sparql/plan.rs +++ b/lib/src/sparql/plan.rs @@ -1,17 +1,14 @@ use crate::model::{BlankNode, Literal, NamedNode, Term, Triple}; use crate::sparql::Variable; use crate::storage::numeric_encoder::EncodedTerm; -use json_event_parser::{JsonEvent, JsonWriter}; use regex::Regex; use spargebra::algebra::GraphPattern; use spargebra::term::GroundTerm; -use std::cell::Cell; use std::cmp::max; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet}; +use std::fmt; use std::rc::Rc; -use std::time::Duration; -use std::{fmt, io}; #[derive(Debug, Clone)] pub enum PlanNode { @@ -1022,142 +1019,3 @@ impl IntoIterator for EncodedTuple { self.inner.into_iter() } } - -pub struct PlanNodeWithStats { - pub node: Rc, - pub children: Vec>, - pub exec_count: Cell, - pub exec_duration: Cell, -} - -impl PlanNodeWithStats { - pub fn json_node( - &self, - writer: &mut JsonWriter, - with_stats: bool, - ) -> io::Result<()> { - writer.write_event(JsonEvent::StartObject)?; - writer.write_event(JsonEvent::ObjectKey("name"))?; - writer.write_event(JsonEvent::String(&self.node_label()))?; - if with_stats { - writer.write_event(JsonEvent::ObjectKey("number of results"))?; - writer.write_event(JsonEvent::Number(&self.exec_count.get().to_string()))?; - writer.write_event(JsonEvent::ObjectKey("duration in seconds"))?; - writer.write_event(JsonEvent::Number( - &self.exec_duration.get().as_secs_f32().to_string(), - ))?; - } - writer.write_event(JsonEvent::ObjectKey("children"))?; - writer.write_event(JsonEvent::StartArray)?; - for child in &self.children { - child.json_node(writer, with_stats)?; - } - writer.write_event(JsonEvent::EndArray)?; - writer.write_event(JsonEvent::EndObject) - } - - fn node_label(&self) -> String { - match self.node.as_ref() { - PlanNode::Aggregate { - key_variables, - aggregates, - .. - } => format!( - "Aggregate({})", - key_variables - .iter() - .map(ToString::to_string) - .chain(aggregates.iter().map(|(agg, v)| format!("{agg} -> {v}"))) - .collect::>() - .join(", ") - ), - PlanNode::AntiJoin { .. } => "AntiJoin".to_owned(), - PlanNode::Extend { - expression, - variable, - .. - } => format!("Extend({expression} -> {variable})"), - PlanNode::Filter { expression, .. } => format!("Filter({expression})"), - PlanNode::ForLoopJoin { .. } => "ForLoopJoin".to_owned(), - PlanNode::ForLoopLeftJoin { .. } => "ForLoopLeftJoin".to_owned(), - PlanNode::HashDeduplicate { .. } => "HashDeduplicate".to_owned(), - PlanNode::HashJoin { .. } => "HashJoin".to_owned(), - PlanNode::HashLeftJoin { expression, .. } => format!("HashLeftJoin({expression})"), - PlanNode::Limit { count, .. } => format!("Limit({count})"), - PlanNode::PathPattern { - subject, - path, - object, - graph_name, - } => format!("PathPattern({subject} {path} {object} {graph_name})"), - PlanNode::Project { mapping, .. } => { - format!( - "Project({})", - mapping - .iter() - .map(|(f, t)| if f.plain == t.plain { - f.to_string() - } else { - format!("{f} -> {t}") - }) - .collect::>() - .join(", ") - ) - } - PlanNode::QuadPattern { - subject, - predicate, - object, - graph_name, - } => format!("QuadPattern({subject} {predicate} {object} {graph_name})"), - PlanNode::Reduced { .. } => "Reduced".to_owned(), - PlanNode::Service { - service_name, - silent, - .. - } => { - if *silent { - format!("SilentService({service_name})") - } else { - format!("Service({service_name})") - } - } - PlanNode::Skip { count, .. } => format!("Skip({count})"), - PlanNode::Sort { by, .. } => { - format!( - "Sort({})", - by.iter() - .map(ToString::to_string) - .collect::>() - .join(", ") - ) - } - PlanNode::StaticBindings { variables, .. } => { - format!( - "StaticBindings({})", - variables - .iter() - .map(ToString::to_string) - .collect::>() - .join(", ") - ) - } - PlanNode::Union { .. } => "Union".to_owned(), - } - } -} - -impl fmt::Debug for PlanNodeWithStats { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut obj = f.debug_struct("Node"); - obj.field("name", &self.node_label()); - if self.exec_duration.get() > Duration::default() { - obj.field("number of results", &self.exec_count.get()); - obj.field("duration in seconds", &self.exec_duration.get()); - } - if !self.children.is_empty() { - obj.field("children", &self.children); - } - obj.finish() - } -} diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 3c05c8f5..3091172f 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -140,7 +140,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> { false, ); let mut bnodes = HashMap::new(); - let (eval, _) = evaluator.plan_evaluator(Rc::new(plan)); + let (eval, _) = evaluator.plan_evaluator(&plan); let tuples = eval(EncodedTuple::with_capacity(variables.len())).collect::, _>>()?; //TODO: would be much better to stream for tuple in tuples {