Makes profiler independent from query plan

pull/570/head
Thomas 2 years ago committed by Thomas Tanon
parent 24a1dd2556
commit 501f9ce6f9
  1. 219
      lib/src/sparql/eval.rs
  2. 13
      lib/src/sparql/mod.rs
  3. 144
      lib/src/sparql/plan.rs
  4. 2
      lib/src/sparql/update.rs

@ -9,6 +9,7 @@ use crate::sparql::service::ServiceHandler;
use crate::storage::numeric_encoder::*;
use crate::storage::small_string::SmallString;
use digest::Digest;
use json_event_parser::{JsonEvent, JsonWriter};
use md5::Md5;
use oxilangtag::LanguageTag;
use oxiri::Iri;
@ -27,10 +28,10 @@ use std::hash::{Hash, Hasher};
use std::iter::Iterator;
use std::iter::{empty, once};
use std::rc::Rc;
use std::str;
use std::time::Duration as StdDuration;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use std::time::Instant;
use std::{fmt, io, str};
const REGEX_SIZE_LIMIT: usize = 1_000_000;
@ -68,9 +69,9 @@ impl SimpleEvaluator {
#[allow(clippy::rc_buffer)]
pub fn evaluate_select_plan(
&self,
plan: Rc<PlanNode>,
plan: &PlanNode,
variables: Rc<Vec<Variable>>,
) -> (QueryResults, Rc<PlanNodeWithStats>) {
) -> (QueryResults, Rc<EvalNodeWithStats>) {
let (eval, stats) = self.plan_evaluator(plan);
(
QueryResults::Solutions(decode_bindings(
@ -84,8 +85,8 @@ impl SimpleEvaluator {
pub fn evaluate_ask_plan(
&self,
plan: Rc<PlanNode>,
) -> (Result<QueryResults, EvaluationError>, Rc<PlanNodeWithStats>) {
plan: &PlanNode,
) -> (Result<QueryResults, EvaluationError>, Rc<EvalNodeWithStats>) {
let from = EncodedTuple::with_capacity(plan.used_variables().len());
let (eval, stats) = self.plan_evaluator(plan);
(
@ -100,9 +101,9 @@ impl SimpleEvaluator {
pub fn evaluate_construct_plan(
&self,
plan: Rc<PlanNode>,
plan: &PlanNode,
template: Vec<TripleTemplate>,
) -> (QueryResults, Rc<PlanNodeWithStats>) {
) -> (QueryResults, Rc<EvalNodeWithStats>) {
let from = EncodedTuple::with_capacity(plan.used_variables().len());
let (eval, stats) = self.plan_evaluator(plan);
(
@ -119,10 +120,7 @@ impl SimpleEvaluator {
)
}
pub fn evaluate_describe_plan(
&self,
plan: Rc<PlanNode>,
) -> (QueryResults, Rc<PlanNodeWithStats>) {
pub fn evaluate_describe_plan(&self, plan: &PlanNode) -> (QueryResults, Rc<EvalNodeWithStats>) {
let from = EncodedTuple::with_capacity(plan.used_variables().len());
let (eval, stats) = self.plan_evaluator(plan);
(
@ -139,13 +137,13 @@ impl SimpleEvaluator {
pub fn plan_evaluator(
&self,
node: Rc<PlanNode>,
node: &PlanNode,
) -> (
Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator>,
Rc<PlanNodeWithStats>,
Rc<EvalNodeWithStats>,
) {
let mut stat_children = Vec::new();
let mut evaluator: Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> = match node.as_ref() {
let mut evaluator: Rc<dyn Fn(EncodedTuple) -> EncodedTuplesIterator> = match node {
PlanNode::StaticBindings { encoded_tuples, .. } => {
let tuples = encoded_tuples.clone();
Rc::new(move |from| {
@ -388,9 +386,9 @@ impl SimpleEvaluator {
.intersection(&build_child.always_bound_variables())
.copied()
.collect();
let (probe, probe_stats) = self.plan_evaluator(Rc::clone(probe_child));
let (probe, probe_stats) = self.plan_evaluator(probe_child);
stat_children.push(probe_stats);
let (build, build_stats) = self.plan_evaluator(Rc::clone(build_child));
let (build, build_stats) = self.plan_evaluator(build_child);
stat_children.push(build_stats);
if join_keys.is_empty() {
// Cartesian product
@ -434,9 +432,9 @@ impl SimpleEvaluator {
}
}
PlanNode::ForLoopJoin { left, right } => {
let (left, left_stats) = self.plan_evaluator(Rc::clone(left));
let (left, left_stats) = self.plan_evaluator(left);
stat_children.push(left_stats);
let (right, right_stats) = self.plan_evaluator(Rc::clone(right));
let (right, right_stats) = self.plan_evaluator(right);
stat_children.push(right_stats);
Rc::new(move |from| {
let right = Rc::clone(&right);
@ -452,9 +450,9 @@ impl SimpleEvaluator {
.intersection(&right.always_bound_variables())
.copied()
.collect();
let (left, left_stats) = self.plan_evaluator(Rc::clone(left));
let (left, left_stats) = self.plan_evaluator(left);
stat_children.push(left_stats);
let (right, right_stats) = self.plan_evaluator(Rc::clone(right));
let (right, right_stats) = self.plan_evaluator(right);
stat_children.push(right_stats);
if join_keys.is_empty() {
Rc::new(move |from| {
@ -495,9 +493,9 @@ impl SimpleEvaluator {
.intersection(&right.always_bound_variables())
.copied()
.collect();
let (left, left_stats) = self.plan_evaluator(Rc::clone(left));
let (left, left_stats) = self.plan_evaluator(left);
stat_children.push(left_stats);
let (right, right_stats) = self.plan_evaluator(Rc::clone(right));
let (right, right_stats) = self.plan_evaluator(right);
stat_children.push(right_stats);
let expression = self.expression_evaluator(expression, &mut stat_children);
// Real hash join
@ -520,9 +518,9 @@ impl SimpleEvaluator {
})
}
PlanNode::ForLoopLeftJoin { left, right } => {
let (left, left_stats) = self.plan_evaluator(Rc::clone(left));
let (left, left_stats) = self.plan_evaluator(left);
stat_children.push(left_stats);
let (right, right_stats) = self.plan_evaluator(Rc::clone(right));
let (right, right_stats) = self.plan_evaluator(right);
stat_children.push(right_stats);
Rc::new(move |from| {
Box::new(ForLoopLeftJoinIterator {
@ -533,7 +531,7 @@ impl SimpleEvaluator {
})
}
PlanNode::Filter { child, expression } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let expression = self.expression_evaluator(expression, &mut stat_children);
Rc::new(move |from| {
@ -552,7 +550,7 @@ impl SimpleEvaluator {
let children: Vec<_> = children
.iter()
.map(|child| {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
child
})
@ -571,7 +569,7 @@ impl SimpleEvaluator {
variable,
expression,
} => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let position = variable.encoded;
let expression = self.expression_evaluator(expression, &mut stat_children);
@ -587,7 +585,7 @@ impl SimpleEvaluator {
})
}
PlanNode::Sort { child, by } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let by: Vec<_> = by
.iter()
@ -645,12 +643,12 @@ impl SimpleEvaluator {
})
}
PlanNode::HashDeduplicate { child } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
Rc::new(move |from| Box::new(hash_deduplicate(child(from))))
}
PlanNode::Reduced { child } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
Rc::new(move |from| {
Box::new(ConsecutiveDeduplication {
@ -660,19 +658,19 @@ impl SimpleEvaluator {
})
}
PlanNode::Skip { child, count } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let count = *count;
Rc::new(move |from| Box::new(child(from).skip(count)))
}
PlanNode::Limit { child, count } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let count = *count;
Rc::new(move |from| Box::new(child(from).take(count)))
}
PlanNode::Project { child, mapping } => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let mapping = Rc::clone(mapping);
Rc::new(move |from| {
@ -712,7 +710,7 @@ impl SimpleEvaluator {
key_variables,
aggregates,
} => {
let (child, child_stats) = self.plan_evaluator(Rc::clone(child));
let (child, child_stats) = self.plan_evaluator(child);
stat_children.push(child_stats);
let key_variables = Rc::clone(key_variables);
let aggregate_input_expressions: Vec<_> = aggregates
@ -806,11 +804,11 @@ impl SimpleEvaluator {
})
}
};
let stats = Rc::new(PlanNodeWithStats {
node,
let stats = Rc::new(EvalNodeWithStats {
label: eval_node_label(node),
children: stat_children,
exec_count: Cell::new(0),
exec_duration: Cell::new(std::time::Duration::from_secs(0)),
exec_duration: Cell::new(StdDuration::from_secs(0)),
});
if self.run_stats {
let stats = Rc::clone(&stats);
@ -920,7 +918,7 @@ impl SimpleEvaluator {
fn expression_evaluator(
&self,
expression: &PlanExpression,
stat_children: &mut Vec<Rc<PlanNodeWithStats>>,
stat_children: &mut Vec<Rc<EvalNodeWithStats>>,
) -> Rc<dyn Fn(&EncodedTuple) -> Option<EncodedTerm>> {
match expression {
PlanExpression::NamedNode(t) => {
@ -936,7 +934,7 @@ impl SimpleEvaluator {
Rc::new(move |tuple| tuple.get(v).cloned())
}
PlanExpression::Exists(plan) => {
let (eval, stats) = self.plan_evaluator(Rc::clone(plan));
let (eval, stats) = self.plan_evaluator(plan);
stat_children.push(stats);
Rc::new(move |tuple| Some(eval(tuple.clone()).next().is_some().into()))
}
@ -2086,7 +2084,7 @@ impl SimpleEvaluator {
fn hash<H: Digest>(
&self,
arg: &PlanExpression,
stat_children: &mut Vec<Rc<PlanNodeWithStats>>,
stat_children: &mut Vec<Rc<EvalNodeWithStats>>,
) -> Rc<dyn Fn(&EncodedTuple) -> Option<EncodedTerm>> {
let arg = self.expression_evaluator(arg, stat_children);
let dataset = Rc::clone(&self.dataset);
@ -4719,7 +4717,7 @@ impl Extend<EncodedTuple> for EncodedTupleSet {
struct StatsIterator {
inner: EncodedTuplesIterator,
stats: Rc<PlanNodeWithStats>,
stats: Rc<EvalNodeWithStats>,
}
impl Iterator for StatsIterator {
@ -4738,6 +4736,145 @@ impl Iterator for StatsIterator {
}
}
pub struct EvalNodeWithStats {
pub label: String,
pub children: Vec<Rc<EvalNodeWithStats>>,
pub exec_count: Cell<usize>,
pub exec_duration: Cell<StdDuration>,
}
impl EvalNodeWithStats {
pub fn json_node(
&self,
writer: &mut JsonWriter<impl io::Write>,
with_stats: bool,
) -> io::Result<()> {
writer.write_event(JsonEvent::StartObject)?;
writer.write_event(JsonEvent::ObjectKey("name"))?;
writer.write_event(JsonEvent::String(&self.label))?;
if with_stats {
writer.write_event(JsonEvent::ObjectKey("number of results"))?;
writer.write_event(JsonEvent::Number(&self.exec_count.get().to_string()))?;
writer.write_event(JsonEvent::ObjectKey("duration in seconds"))?;
writer.write_event(JsonEvent::Number(
&self.exec_duration.get().as_secs_f32().to_string(),
))?;
}
writer.write_event(JsonEvent::ObjectKey("children"))?;
writer.write_event(JsonEvent::StartArray)?;
for child in &self.children {
child.json_node(writer, with_stats)?;
}
writer.write_event(JsonEvent::EndArray)?;
writer.write_event(JsonEvent::EndObject)
}
}
impl fmt::Debug for EvalNodeWithStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut obj = f.debug_struct("Node");
obj.field("name", &self.label);
if self.exec_duration.get() > StdDuration::default() {
obj.field("number of results", &self.exec_count.get());
obj.field("duration in seconds", &self.exec_duration.get());
}
if !self.children.is_empty() {
obj.field("children", &self.children);
}
obj.finish()
}
}
fn eval_node_label(node: &PlanNode) -> String {
match node {
PlanNode::Aggregate {
key_variables,
aggregates,
..
} => format!(
"Aggregate({})",
key_variables
.iter()
.map(ToString::to_string)
.chain(aggregates.iter().map(|(agg, v)| format!("{agg} -> {v}")))
.collect::<Vec<_>>()
.join(", ")
),
PlanNode::AntiJoin { .. } => "AntiJoin".to_owned(),
PlanNode::Extend {
expression,
variable,
..
} => format!("Extend({expression} -> {variable})"),
PlanNode::Filter { expression, .. } => format!("Filter({expression})"),
PlanNode::ForLoopJoin { .. } => "ForLoopJoin".to_owned(),
PlanNode::ForLoopLeftJoin { .. } => "ForLoopLeftJoin".to_owned(),
PlanNode::HashDeduplicate { .. } => "HashDeduplicate".to_owned(),
PlanNode::HashJoin { .. } => "HashJoin".to_owned(),
PlanNode::HashLeftJoin { expression, .. } => format!("HashLeftJoin({expression})"),
PlanNode::Limit { count, .. } => format!("Limit({count})"),
PlanNode::PathPattern {
subject,
path,
object,
graph_name,
} => format!("PathPattern({subject} {path} {object} {graph_name})"),
PlanNode::Project { mapping, .. } => {
format!(
"Project({})",
mapping
.iter()
.map(|(f, t)| if f.plain == t.plain {
f.to_string()
} else {
format!("{f} -> {t}")
})
.collect::<Vec<_>>()
.join(", ")
)
}
PlanNode::QuadPattern {
subject,
predicate,
object,
graph_name,
} => format!("QuadPattern({subject} {predicate} {object} {graph_name})"),
PlanNode::Reduced { .. } => "Reduced".to_owned(),
PlanNode::Service {
service_name,
silent,
..
} => {
if *silent {
format!("SilentService({service_name})")
} else {
format!("Service({service_name})")
}
}
PlanNode::Skip { count, .. } => format!("Skip({count})"),
PlanNode::Sort { by, .. } => {
format!(
"Sort({})",
by.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
)
}
PlanNode::StaticBindings { variables, .. } => {
format!(
"StaticBindings({})",
variables
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
)
}
PlanNode::Union { .. } => "Union".to_owned(),
}
}
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
pub struct Timer {
timestamp_ms: f64,

@ -17,9 +17,8 @@ use crate::model::{NamedNode, Term};
pub use crate::sparql::algebra::{Query, QueryDataset, Update};
use crate::sparql::dataset::DatasetView;
pub use crate::sparql::error::{EvaluationError, QueryError};
use crate::sparql::eval::{SimpleEvaluator, Timer};
use crate::sparql::eval::{EvalNodeWithStats, SimpleEvaluator, Timer};
pub use crate::sparql::model::{QueryResults, QuerySolution, QuerySolutionIter, QueryTripleIter};
use crate::sparql::plan::PlanNodeWithStats;
use crate::sparql::plan_builder::PlanBuilder;
pub use crate::sparql::service::ServiceHandler;
use crate::sparql::service::{EmptyServiceHandler, ErrorConversionServiceHandler};
@ -63,7 +62,7 @@ pub(crate) fn evaluate_query(
Rc::new(options.custom_functions),
run_stats,
)
.evaluate_select_plan(Rc::new(plan), Rc::new(variables));
.evaluate_select_plan(&plan, Rc::new(variables));
(Ok(results), explanation, planning_duration)
}
spargebra::Query::Ask {
@ -84,7 +83,7 @@ pub(crate) fn evaluate_query(
Rc::new(options.custom_functions),
run_stats,
)
.evaluate_ask_plan(Rc::new(plan));
.evaluate_ask_plan(&plan);
(results, explanation, planning_duration)
}
spargebra::Query::Construct {
@ -114,7 +113,7 @@ pub(crate) fn evaluate_query(
Rc::new(options.custom_functions),
run_stats,
)
.evaluate_construct_plan(Rc::new(plan), construct);
.evaluate_construct_plan(&plan, construct);
(Ok(results), explanation, planning_duration)
}
spargebra::Query::Describe {
@ -135,7 +134,7 @@ pub(crate) fn evaluate_query(
Rc::new(options.custom_functions),
run_stats,
)
.evaluate_describe_plan(Rc::new(plan));
.evaluate_describe_plan(&plan);
(Ok(results), explanation, planning_duration)
}
};
@ -284,7 +283,7 @@ impl From<QueryOptions> for UpdateOptions {
/// The explanation of a query.
#[derive(Clone)]
pub struct QueryExplanation {
inner: Rc<PlanNodeWithStats>,
inner: Rc<EvalNodeWithStats>,
with_stats: bool,
parsing_duration: Option<Duration>,
planning_duration: Duration,

@ -1,17 +1,14 @@
use crate::model::{BlankNode, Literal, NamedNode, Term, Triple};
use crate::sparql::Variable;
use crate::storage::numeric_encoder::EncodedTerm;
use json_event_parser::{JsonEvent, JsonWriter};
use regex::Regex;
use spargebra::algebra::GraphPattern;
use spargebra::term::GroundTerm;
use std::cell::Cell;
use std::cmp::max;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::rc::Rc;
use std::time::Duration;
use std::{fmt, io};
#[derive(Debug, Clone)]
pub enum PlanNode {
@ -1022,142 +1019,3 @@ impl IntoIterator for EncodedTuple {
self.inner.into_iter()
}
}
pub struct PlanNodeWithStats {
pub node: Rc<PlanNode>,
pub children: Vec<Rc<PlanNodeWithStats>>,
pub exec_count: Cell<usize>,
pub exec_duration: Cell<Duration>,
}
impl PlanNodeWithStats {
pub fn json_node(
&self,
writer: &mut JsonWriter<impl io::Write>,
with_stats: bool,
) -> io::Result<()> {
writer.write_event(JsonEvent::StartObject)?;
writer.write_event(JsonEvent::ObjectKey("name"))?;
writer.write_event(JsonEvent::String(&self.node_label()))?;
if with_stats {
writer.write_event(JsonEvent::ObjectKey("number of results"))?;
writer.write_event(JsonEvent::Number(&self.exec_count.get().to_string()))?;
writer.write_event(JsonEvent::ObjectKey("duration in seconds"))?;
writer.write_event(JsonEvent::Number(
&self.exec_duration.get().as_secs_f32().to_string(),
))?;
}
writer.write_event(JsonEvent::ObjectKey("children"))?;
writer.write_event(JsonEvent::StartArray)?;
for child in &self.children {
child.json_node(writer, with_stats)?;
}
writer.write_event(JsonEvent::EndArray)?;
writer.write_event(JsonEvent::EndObject)
}
fn node_label(&self) -> String {
match self.node.as_ref() {
PlanNode::Aggregate {
key_variables,
aggregates,
..
} => format!(
"Aggregate({})",
key_variables
.iter()
.map(ToString::to_string)
.chain(aggregates.iter().map(|(agg, v)| format!("{agg} -> {v}")))
.collect::<Vec<_>>()
.join(", ")
),
PlanNode::AntiJoin { .. } => "AntiJoin".to_owned(),
PlanNode::Extend {
expression,
variable,
..
} => format!("Extend({expression} -> {variable})"),
PlanNode::Filter { expression, .. } => format!("Filter({expression})"),
PlanNode::ForLoopJoin { .. } => "ForLoopJoin".to_owned(),
PlanNode::ForLoopLeftJoin { .. } => "ForLoopLeftJoin".to_owned(),
PlanNode::HashDeduplicate { .. } => "HashDeduplicate".to_owned(),
PlanNode::HashJoin { .. } => "HashJoin".to_owned(),
PlanNode::HashLeftJoin { expression, .. } => format!("HashLeftJoin({expression})"),
PlanNode::Limit { count, .. } => format!("Limit({count})"),
PlanNode::PathPattern {
subject,
path,
object,
graph_name,
} => format!("PathPattern({subject} {path} {object} {graph_name})"),
PlanNode::Project { mapping, .. } => {
format!(
"Project({})",
mapping
.iter()
.map(|(f, t)| if f.plain == t.plain {
f.to_string()
} else {
format!("{f} -> {t}")
})
.collect::<Vec<_>>()
.join(", ")
)
}
PlanNode::QuadPattern {
subject,
predicate,
object,
graph_name,
} => format!("QuadPattern({subject} {predicate} {object} {graph_name})"),
PlanNode::Reduced { .. } => "Reduced".to_owned(),
PlanNode::Service {
service_name,
silent,
..
} => {
if *silent {
format!("SilentService({service_name})")
} else {
format!("Service({service_name})")
}
}
PlanNode::Skip { count, .. } => format!("Skip({count})"),
PlanNode::Sort { by, .. } => {
format!(
"Sort({})",
by.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
)
}
PlanNode::StaticBindings { variables, .. } => {
format!(
"StaticBindings({})",
variables
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
)
}
PlanNode::Union { .. } => "Union".to_owned(),
}
}
}
impl fmt::Debug for PlanNodeWithStats {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut obj = f.debug_struct("Node");
obj.field("name", &self.node_label());
if self.exec_duration.get() > Duration::default() {
obj.field("number of results", &self.exec_count.get());
obj.field("duration in seconds", &self.exec_duration.get());
}
if !self.children.is_empty() {
obj.field("children", &self.children);
}
obj.finish()
}
}

@ -140,7 +140,7 @@ impl<'a, 'b: 'a> SimpleUpdateEvaluator<'a, 'b> {
false,
);
let mut bnodes = HashMap::new();
let (eval, _) = evaluator.plan_evaluator(Rc::new(plan));
let (eval, _) = evaluator.plan_evaluator(&plan);
let tuples =
eval(EncodedTuple::with_capacity(variables.len())).collect::<Result<Vec<_>, _>>()?; //TODO: would be much better to stream
for tuple in tuples {

Loading…
Cancel
Save