From e104b32fcd6759d3134d396f4f002f29cc9f9b39 Mon Sep 17 00:00:00 2001
From: Tpt <thomaspt@hotmail.fr>
Date: Fri, 5 Jan 2024 09:38:47 +0100
Subject: [PATCH] Adds an async SPARQL JSON results reader

---
 lib/sparesults/src/json.rs   | 1238 ++++++++++++++++++++++------------
 lib/sparesults/src/lib.rs    |    4 +
 lib/sparesults/src/parser.rs |  234 ++++++-
 3 files changed, 1013 insertions(+), 463 deletions(-)

diff --git a/lib/sparesults/src/json.rs b/lib/sparesults/src/json.rs
index 25da2d64..efed124a 100644
--- a/lib/sparesults/src/json.rs
+++ b/lib/sparesults/src/json.rs
@@ -1,20 +1,16 @@
 //! Implementation of [SPARQL Query Results JSON Format](https://www.w3.org/TR/sparql11-results-json/)
 
 use crate::error::{QueryResultsParseError, QueryResultsSyntaxError};
-#[cfg(feature = "async-tokio")]
-use json_event_parser::ToTokioAsyncWriteJsonWriter;
 use json_event_parser::{FromReadJsonReader, JsonEvent, ToWriteJsonWriter};
+#[cfg(feature = "async-tokio")]
+use json_event_parser::{FromTokioAsyncReadJsonReader, ToTokioAsyncWriteJsonWriter};
 use oxrdf::vocab::rdf;
 use oxrdf::*;
 use std::collections::BTreeMap;
 use std::io::{self, Read, Write};
 use std::mem::take;
 #[cfg(feature = "async-tokio")]
-use tokio::io::AsyncWrite;
-
-/// This limit is set in order to avoid stack overflow error when parsing nested triples due to too many recursive calls.
-/// The actual limit value is a wet finger compromise between not failing to parse valid files and avoiding to trigger stack overflow errors.
-const MAX_NUMBER_OF_NESTED_TRIPLES: usize = 128;
+use tokio::io::{AsyncRead, AsyncWrite};
 
 pub fn write_boolean_json_result<W: Write>(write: W, value: bool) -> io::Result<W> {
     let mut writer = ToWriteJsonWriter::new(write);
@@ -224,520 +220,882 @@ fn write_json_term<'a>(output: &mut Vec<JsonEvent<'a>>, term: TermRef<'a>) {
     }
 }
 
-pub enum JsonQueryResultsReader<R: Read> {
+pub enum FromReadJsonQueryResultsReader<R: Read> {
     Solutions {
         variables: Vec<Variable>,
-        solutions: JsonSolutionsReader<R>,
+        solutions: FromReadJsonSolutionsReader<R>,
     },
     Boolean(bool),
 }
 
-impl<R: Read> JsonQueryResultsReader<R> {
+impl<R: Read> FromReadJsonQueryResultsReader<R> {
     pub fn read(read: R) -> Result<Self, QueryResultsParseError> {
         let mut reader = FromReadJsonReader::new(read);
-        let mut variables = None;
-        let mut buffered_bindings: Option<Vec<_>> = None;
-        let mut output_iter = None;
-
-        if reader.read_next_event()? != JsonEvent::StartObject {
-            return Err(
-                QueryResultsSyntaxError::msg("SPARQL JSON results should be an object").into(),
-            );
+        let mut inner = JsonInnerReader::new();
+        loop {
+            if let Some(result) = inner.read_event(reader.read_next_event()?)? {
+                return match result {
+                    JsonInnerQueryResults::Solutions {
+                        variables,
+                        solutions,
+                    } => Ok(Self::Solutions {
+                        variables,
+                        solutions: FromReadJsonSolutionsReader {
+                            inner: solutions,
+                            reader,
+                        },
+                    }),
+                    JsonInnerQueryResults::Boolean(value) => Ok(Self::Boolean(value)),
+                };
+            }
         }
+    }
+}
+
+pub struct FromReadJsonSolutionsReader<R: Read> {
+    inner: JsonInnerSolutions,
+    reader: FromReadJsonReader<R>,
+}
+
+impl<R: Read> FromReadJsonSolutionsReader<R> {
+    pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
+        match &mut self.inner {
+            JsonInnerSolutions::Reader(reader) => loop {
+                let event = self.reader.read_next_event()?;
+                if event == JsonEvent::Eof {
+                    return Ok(None);
+                }
+                if let Some(result) = reader.read_event(event)? {
+                    return Ok(Some(result));
+                }
+            },
+            JsonInnerSolutions::Iterator(iter) => iter.next(),
+        }
+    }
+}
 
+#[cfg(feature = "async-tokio")]
+pub enum FromTokioAsyncReadJsonQueryResultsReader<R: AsyncRead + Unpin> {
+    Solutions {
+        variables: Vec<Variable>,
+        solutions: FromTokioAsyncReadJsonSolutionsReader<R>,
+    },
+    Boolean(bool),
+}
+
+#[cfg(feature = "async-tokio")]
+impl<R: AsyncRead + Unpin> FromTokioAsyncReadJsonQueryResultsReader<R> {
+    pub async fn read(read: R) -> Result<Self, QueryResultsParseError> {
+        let mut reader = FromTokioAsyncReadJsonReader::new(read);
+        let mut inner = JsonInnerReader::new();
         loop {
-            let event = reader.read_next_event()?;
-            match event {
+            if let Some(result) = inner.read_event(reader.read_next_event().await?)? {
+                return match result {
+                    JsonInnerQueryResults::Solutions {
+                        variables,
+                        solutions,
+                    } => Ok(Self::Solutions {
+                        variables,
+                        solutions: FromTokioAsyncReadJsonSolutionsReader {
+                            inner: solutions,
+                            reader,
+                        },
+                    }),
+                    JsonInnerQueryResults::Boolean(value) => Ok(Self::Boolean(value)),
+                };
+            }
+        }
+    }
+}
+
+#[cfg(feature = "async-tokio")]
+pub struct FromTokioAsyncReadJsonSolutionsReader<R: AsyncRead + Unpin> {
+    inner: JsonInnerSolutions,
+    reader: FromTokioAsyncReadJsonReader<R>,
+}
+
+#[cfg(feature = "async-tokio")]
+impl<R: AsyncRead + Unpin> FromTokioAsyncReadJsonSolutionsReader<R> {
+    pub async fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
+        match &mut self.inner {
+            JsonInnerSolutions::Reader(reader) => loop {
+                let event = self.reader.read_next_event().await?;
+                if event == JsonEvent::Eof {
+                    return Ok(None);
+                }
+                if let Some(result) = reader.read_event(event)? {
+                    return Ok(Some(result));
+                }
+            },
+            JsonInnerSolutions::Iterator(iter) => iter.next(),
+        }
+    }
+}
+
+enum JsonInnerQueryResults {
+    Solutions {
+        variables: Vec<Variable>,
+        solutions: JsonInnerSolutions,
+    },
+    Boolean(bool),
+}
+
+enum JsonInnerSolutions {
+    Reader(JsonInnerSolutionsReader),
+    Iterator(JsonBufferedSolutionsIterator),
+}
+
+struct JsonInnerReader {
+    state: JsonInnerReaderState,
+    variables: Vec<Variable>,
+    current_solution_variables: Vec<String>,
+    current_solution_values: Vec<Term>,
+    solutions: Vec<(Vec<String>, Vec<Term>)>,
+    vars_read: bool,
+    solutions_read: bool,
+}
+
+enum JsonInnerReaderState {
+    Start,
+    InRootObject,
+    BeforeHead,
+    InHead,
+    BeforeVars,
+    InVars,
+    BeforeLinks,
+    InLinks,
+    BeforeResults,
+    InResults,
+    BeforeBindings,
+    BeforeSolution,
+    BetweenSolutionTerms,
+    Term {
+        reader: JsonInnerTermReader,
+        variable: String,
+    },
+    AfterBindings,
+    BeforeBoolean,
+    Ignore {
+        level: usize,
+        after: JsonInnerReaderStateAfterIgnore,
+    },
+}
+
+#[allow(clippy::enum_variant_names)]
+#[derive(Clone, Copy)]
+enum JsonInnerReaderStateAfterIgnore {
+    InRootObject,
+    InHead,
+    InResults,
+    AfterBindings,
+}
+
+impl JsonInnerReader {
+    fn new() -> Self {
+        Self {
+            state: JsonInnerReaderState::Start,
+            variables: Vec::new(),
+            current_solution_variables: Vec::new(),
+            current_solution_values: Vec::new(),
+            solutions: Vec::new(),
+            vars_read: false,
+            solutions_read: false,
+        }
+    }
+
+    fn read_event(
+        &mut self,
+        event: JsonEvent<'_>,
+    ) -> Result<Option<JsonInnerQueryResults>, QueryResultsSyntaxError> {
+        match &mut self.state {
+            JsonInnerReaderState::Start => {
+                if event == JsonEvent::StartObject {
+                    self.state = JsonInnerReaderState::InRootObject;
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "SPARQL JSON results must be an object",
+                    ))
+                }
+            }
+            JsonInnerReaderState::InRootObject => match event {
                 JsonEvent::ObjectKey(key) => match key.as_ref() {
                     "head" => {
-                        let extracted_variables = read_head(&mut reader)?;
-                        if let Some(buffered_bindings) = buffered_bindings.take() {
-                            let mut mapping = BTreeMap::default();
-                            for (i, var) in extracted_variables.iter().enumerate() {
-                                mapping.insert(var.as_str().to_owned(), i);
-                            }
-                            output_iter = Some(Self::Solutions {
-                                variables: extracted_variables,
-                                solutions: JsonSolutionsReader {
-                                    kind: JsonSolutionsReaderKind::Buffered {
-                                        bindings: buffered_bindings.into_iter(),
-                                    },
-                                    mapping,
-                                },
-                            });
-                        } else {
-                            variables = Some(extracted_variables);
-                        }
+                        self.state = JsonInnerReaderState::BeforeHead;
+                        Ok(None)
                     }
                     "results" => {
-                        if reader.read_next_event()? != JsonEvent::StartObject {
-                            return Err(QueryResultsSyntaxError::msg(
-                                "'results' should be an object",
-                            )
-                            .into());
-                        }
-                        loop {
-                            match reader.read_next_event()? {
-                                JsonEvent::ObjectKey(k) if k == "bindings" => break, // Found
-                                JsonEvent::ObjectKey(_) => ignore_value(&mut reader)?,
-                                _ => {
-                                    return Err(QueryResultsSyntaxError::msg(
-                                        "'results' should contain a 'bindings' key",
-                                    )
-                                    .into())
-                                }
-                            }
-                        }
-                        if reader.read_next_event()? != JsonEvent::StartArray {
-                            return Err(QueryResultsSyntaxError::msg(
-                                "'bindings' should be an object",
-                            )
-                            .into());
-                        }
-                        if let Some(variables) = variables {
-                            let mut mapping = BTreeMap::default();
-                            for (i, var) in variables.iter().enumerate() {
-                                mapping.insert(var.as_str().to_owned(), i);
-                            }
-                            return Ok(Self::Solutions {
-                                variables,
-                                solutions: JsonSolutionsReader {
-                                    kind: JsonSolutionsReaderKind::Streaming { reader },
-                                    mapping,
-                                },
-                            });
-                        }
-                        // We buffer all results before being able to read the header
-                        let mut bindings = Vec::new();
-                        let mut variables = Vec::new();
-                        let mut values = Vec::new();
-                        loop {
-                            match reader.read_next_event()? {
-                                JsonEvent::StartObject => (),
-                                JsonEvent::EndObject => {
-                                    bindings.push((take(&mut variables), take(&mut values)));
-                                }
-                                JsonEvent::EndArray | JsonEvent::Eof => {
-                                    buffered_bindings = Some(bindings);
-                                    break;
-                                }
-                                JsonEvent::ObjectKey(key) => {
-                                    variables.push(key.into_owned());
-                                    values.push(read_value(&mut reader, 0)?);
-                                }
-                                _ => {
-                                    return Err(QueryResultsSyntaxError::msg(
-                                        "Invalid result serialization",
-                                    )
-                                    .into())
-                                }
-                            }
-                        }
+                        self.state = JsonInnerReaderState::BeforeResults;
+                        Ok(None)
                     }
                     "boolean" => {
-                        return if let JsonEvent::Boolean(v) = reader.read_next_event()? {
-                            Ok(Self::Boolean(v))
-                        } else {
-                            Err(QueryResultsSyntaxError::msg("Unexpected boolean value").into())
-                        }
+                        self.state = JsonInnerReaderState::BeforeBoolean;
+                        Ok(None)
                     }
                     _ => {
-                        return Err(QueryResultsSyntaxError::msg(format!(
-                            "Expecting head or result key, found {key}"
-                        ))
-                        .into());
+                        self.state = JsonInnerReaderState::Ignore {
+                            level: 0,
+                            after: JsonInnerReaderStateAfterIgnore::InRootObject,
+                        };
+                        Ok(None)
+                    }
+                },
+                JsonEvent::EndObject => Err(QueryResultsSyntaxError::msg(
+                    "SPARQL JSON results must contain a 'boolean' or a 'results' key",
+                )),
+                _ => unreachable!(),
+            },
+            JsonInnerReaderState::BeforeHead => {
+                if event == JsonEvent::StartObject {
+                    self.state = JsonInnerReaderState::InHead;
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "SPARQL JSON results head must be an object",
+                    ))
+                }
+            }
+            JsonInnerReaderState::InHead => match event {
+                JsonEvent::ObjectKey(key) => match key.as_ref() {
+                    "vars" => {
+                        self.state = JsonInnerReaderState::BeforeVars;
+                        self.vars_read = true;
+                        Ok(None)
+                    }
+                    "links" => {
+                        self.state = JsonInnerReaderState::BeforeLinks;
+                        Ok(None)
+                    }
+                    _ => {
+                        self.state = JsonInnerReaderState::Ignore {
+                            level: 0,
+                            after: JsonInnerReaderStateAfterIgnore::InHead,
+                        };
+                        Ok(None)
+                    }
+                },
+                JsonEvent::EndObject => {
+                    self.state = JsonInnerReaderState::InRootObject;
+                    Ok(None)
+                }
+                _ => unreachable!(),
+            },
+            JsonInnerReaderState::BeforeVars => {
+                if event == JsonEvent::StartArray {
+                    self.state = JsonInnerReaderState::InVars;
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "SPARQL JSON results vars must be an array",
+                    ))
+                }
+            }
+            JsonInnerReaderState::InVars => match event {
+                JsonEvent::String(variable) => match Variable::new(variable.clone()) {
+                    Ok(var) => {
+                        if self.variables.contains(&var) {
+                            return Err(QueryResultsSyntaxError::msg(format!(
+                                "The variable {var} is declared twice"
+                            )));
+                        }
+                        self.variables.push(var);
+                        Ok(None)
                     }
+                    Err(e) => Err(QueryResultsSyntaxError::msg(format!(
+                        "Invalid variable name '{variable}': {e}"
+                    ))),
                 },
-                JsonEvent::EndObject => (),
-                JsonEvent::Eof => {
-                    return if let Some(output_iter) = output_iter {
-                        Ok(output_iter)
+                JsonEvent::EndArray => {
+                    if self.solutions_read {
+                        let mut mapping = BTreeMap::default();
+                        for (i, var) in self.variables.iter().enumerate() {
+                            mapping.insert(var.as_str().to_owned(), i);
+                        }
+                        Ok(Some(JsonInnerQueryResults::Solutions {
+                            variables: take(&mut self.variables),
+                            solutions: JsonInnerSolutions::Iterator(
+                                JsonBufferedSolutionsIterator {
+                                    mapping,
+                                    bindings: take(&mut self.solutions).into_iter(),
+                                },
+                            ),
+                        }))
                     } else {
-                        Err(QueryResultsSyntaxError::msg(
-                            "Unexpected end of JSON object without 'results' or 'boolean' key",
-                        )
-                        .into())
+                        self.state = JsonInnerReaderState::InHead;
+                        Ok(None)
                     }
                 }
-                _ => {
-                    return Err(QueryResultsSyntaxError::msg(
-                        "Invalid SPARQL results serialization",
-                    )
-                    .into())
+                _ => Err(QueryResultsSyntaxError::msg(
+                    "Variables name in the vars array must be strings",
+                )),
+            },
+            JsonInnerReaderState::BeforeLinks => {
+                if event == JsonEvent::StartArray {
+                    self.state = JsonInnerReaderState::InLinks;
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "SPARQL JSON results links must be an array",
+                    ))
+                }
+            }
+            JsonInnerReaderState::InLinks => match event {
+                JsonEvent::String(_) => Ok(None),
+                JsonEvent::EndArray => {
+                    self.state = JsonInnerReaderState::InHead;
+                    Ok(None)
+                }
+                _ => Err(QueryResultsSyntaxError::msg(
+                    "Links in the links array must be strings",
+                )),
+            },
+            JsonInnerReaderState::BeforeResults => {
+                if event == JsonEvent::StartObject {
+                    self.state = JsonInnerReaderState::InResults;
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "SPARQL JSON results result must be an object",
+                    ))
+                }
+            }
+            JsonInnerReaderState::InResults => match event {
+                JsonEvent::ObjectKey(key) => {
+                    if key == "bindings" {
+                        self.state = JsonInnerReaderState::BeforeBindings;
+                        Ok(None)
+                    } else {
+                        self.state = JsonInnerReaderState::Ignore {
+                            level: 0,
+                            after: JsonInnerReaderStateAfterIgnore::InResults,
+                        };
+                        Ok(None)
+                    }
+                }
+                JsonEvent::EndObject => Err(QueryResultsSyntaxError::msg(
+                    "The results object must contains a 'bindings' key",
+                )),
+                _ => unreachable!(),
+            },
+            JsonInnerReaderState::BeforeBindings => {
+                if event == JsonEvent::StartArray {
+                    self.solutions_read = true;
+                    if self.vars_read {
+                        let mut mapping = BTreeMap::default();
+                        for (i, var) in self.variables.iter().enumerate() {
+                            mapping.insert(var.as_str().to_owned(), i);
+                        }
+                        Ok(Some(JsonInnerQueryResults::Solutions {
+                            variables: take(&mut self.variables),
+                            solutions: JsonInnerSolutions::Reader(JsonInnerSolutionsReader {
+                                state: JsonInnerSolutionsReaderState::BeforeSolution,
+                                mapping,
+                                new_bindings: Vec::new(),
+                            }),
+                        }))
+                    } else {
+                        self.state = JsonInnerReaderState::BeforeSolution;
+                        Ok(None)
+                    }
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "SPARQL JSON results bindings must be an array",
+                    ))
+                }
+            }
+            JsonInnerReaderState::BeforeSolution => match event {
+                JsonEvent::StartObject => {
+                    self.state = JsonInnerReaderState::BetweenSolutionTerms;
+                    Ok(None)
+                }
+                JsonEvent::EndArray => {
+                    self.state = JsonInnerReaderState::AfterBindings;
+                    Ok(None)
+                }
+                _ => Err(QueryResultsSyntaxError::msg(
+                    "Expecting a new solution object",
+                )),
+            },
+            JsonInnerReaderState::BetweenSolutionTerms => match event {
+                JsonEvent::ObjectKey(key) => {
+                    self.state = JsonInnerReaderState::Term {
+                        reader: JsonInnerTermReader::default(),
+                        variable: key.into(),
+                    };
+                    Ok(None)
+                }
+                JsonEvent::EndObject => {
+                    self.state = JsonInnerReaderState::BeforeSolution;
+                    self.solutions.push((
+                        take(&mut self.current_solution_variables),
+                        take(&mut self.current_solution_values),
+                    ));
+                    Ok(None)
+                }
+                _ => unreachable!(),
+            },
+            JsonInnerReaderState::Term {
+                ref mut reader,
+                variable,
+            } => {
+                let result = reader.read_event(event);
+                if let Some(term) = result? {
+                    self.current_solution_variables.push(take(variable));
+                    self.current_solution_values.push(term);
+                    self.state = JsonInnerReaderState::BetweenSolutionTerms;
                 }
+                Ok(None)
+            }
+            JsonInnerReaderState::AfterBindings => {
+                if event == JsonEvent::EndObject {
+                    self.state = JsonInnerReaderState::InRootObject;
+                } else {
+                    self.state = JsonInnerReaderState::Ignore {
+                        level: 0,
+                        after: JsonInnerReaderStateAfterIgnore::AfterBindings,
+                    }
+                }
+                Ok(None)
+            }
+            JsonInnerReaderState::BeforeBoolean => {
+                if let JsonEvent::Boolean(v) = event {
+                    Ok(Some(JsonInnerQueryResults::Boolean(v)))
+                } else {
+                    Err(QueryResultsSyntaxError::msg("Unexpected boolean value"))
+                }
+            }
+            #[allow(clippy::ref_patterns)]
+            JsonInnerReaderState::Ignore { level, ref after } => {
+                let level = match event {
+                    JsonEvent::StartArray | JsonEvent::StartObject => *level + 1,
+                    JsonEvent::EndArray | JsonEvent::EndObject => *level - 1,
+                    JsonEvent::String(_)
+                    | JsonEvent::Number(_)
+                    | JsonEvent::Boolean(_)
+                    | JsonEvent::Null
+                    | JsonEvent::ObjectKey(_)
+                    | JsonEvent::Eof => *level,
+                };
+                self.state = if level == 0 {
+                    match after {
+                        JsonInnerReaderStateAfterIgnore::InRootObject => {
+                            JsonInnerReaderState::InRootObject
+                        }
+                        JsonInnerReaderStateAfterIgnore::InHead => JsonInnerReaderState::InHead,
+                        JsonInnerReaderStateAfterIgnore::InResults => {
+                            JsonInnerReaderState::InResults
+                        }
+                        JsonInnerReaderStateAfterIgnore::AfterBindings => {
+                            JsonInnerReaderState::AfterBindings
+                        }
+                    }
+                } else {
+                    JsonInnerReaderState::Ignore {
+                        level,
+                        after: *after,
+                    }
+                };
+                Ok(None)
             }
         }
     }
 }
 
-pub struct JsonSolutionsReader<R: Read> {
+struct JsonInnerSolutionsReader {
+    state: JsonInnerSolutionsReaderState,
     mapping: BTreeMap<String, usize>,
-    kind: JsonSolutionsReaderKind<R>,
+    new_bindings: Vec<Option<Term>>,
 }
 
-enum JsonSolutionsReaderKind<R: Read> {
-    Streaming {
-        reader: FromReadJsonReader<R>,
-    },
-    Buffered {
-        bindings: std::vec::IntoIter<(Vec<String>, Vec<Term>)>,
+enum JsonInnerSolutionsReaderState {
+    BeforeSolution,
+    BetweenSolutionTerms,
+    Term {
+        reader: JsonInnerTermReader,
+        key: usize,
     },
+    AfterEnd,
 }
 
-impl<R: Read> JsonSolutionsReader<R> {
-    pub fn read_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
-        match &mut self.kind {
-            JsonSolutionsReaderKind::Streaming { reader } => {
-                let mut new_bindings = vec![None; self.mapping.len()];
-                loop {
-                    match reader.read_next_event()? {
-                        JsonEvent::StartObject => (),
-                        JsonEvent::EndObject => return Ok(Some(new_bindings)),
-                        JsonEvent::EndArray | JsonEvent::Eof => return Ok(None),
-                        JsonEvent::ObjectKey(key) => {
-                            let k = *self.mapping.get(key.as_ref()).ok_or_else(|| {
-                                QueryResultsSyntaxError::msg(format!(
-                                    "The variable {key} has not been defined in the header"
-                                ))
-                            })?;
-                            new_bindings[k] = Some(read_value(reader, 0)?)
-                        }
-                        _ => {
-                            return Err(QueryResultsSyntaxError::msg(
-                                "Invalid result serialization",
-                            )
-                            .into())
-                        }
-                    }
+impl JsonInnerSolutionsReader {
+    fn read_event(
+        &mut self,
+        event: JsonEvent<'_>,
+    ) -> Result<Option<Vec<Option<Term>>>, QueryResultsSyntaxError> {
+        match &mut self.state {
+            JsonInnerSolutionsReaderState::BeforeSolution => match event {
+                JsonEvent::StartObject => {
+                    self.state = JsonInnerSolutionsReaderState::BetweenSolutionTerms;
+                    self.new_bindings = vec![None; self.mapping.len()];
+                    Ok(None)
+                }
+                JsonEvent::EndArray => {
+                    self.state = JsonInnerSolutionsReaderState::AfterEnd;
+                    Ok(None)
                 }
+                _ => Err(QueryResultsSyntaxError::msg(
+                    "Expecting a new solution object",
+                )),
+            },
+            JsonInnerSolutionsReaderState::BetweenSolutionTerms => match event {
+                JsonEvent::ObjectKey(key) => {
+                    let key = *self.mapping.get(key.as_ref()).ok_or_else(|| {
+                        QueryResultsSyntaxError::msg(format!(
+                            "The variable {key} has not been defined in the header"
+                        ))
+                    })?;
+                    self.state = JsonInnerSolutionsReaderState::Term {
+                        reader: JsonInnerTermReader::default(),
+                        key,
+                    };
+                    Ok(None)
+                }
+                JsonEvent::EndObject => {
+                    self.state = JsonInnerSolutionsReaderState::BeforeSolution;
+                    Ok(Some(take(&mut self.new_bindings)))
+                }
+                _ => unreachable!(),
+            },
+            JsonInnerSolutionsReaderState::Term {
+                ref mut reader,
+                key,
+            } => {
+                let result = reader.read_event(event);
+                if let Some(term) = result? {
+                    self.new_bindings[*key] = Some(term);
+                    self.state = JsonInnerSolutionsReaderState::BetweenSolutionTerms;
+                }
+                Ok(None)
             }
-            JsonSolutionsReaderKind::Buffered { bindings } => {
-                Ok(if let Some((variables, values)) = bindings.next() {
-                    let mut new_bindings = vec![None; self.mapping.len()];
-                    for (variable, value) in variables.into_iter().zip(values) {
-                        let k = *self.mapping.get(&variable).ok_or_else(|| {
-                            QueryResultsSyntaxError::msg(format!(
-                                "The variable {variable} has not been defined in the header"
-                            ))
-                        })?;
-                        new_bindings[k] = Some(value)
-                    }
-                    Some(new_bindings)
+            JsonInnerSolutionsReaderState::AfterEnd => {
+                if event == JsonEvent::EndObject {
+                    Ok(None)
                 } else {
-                    None
-                })
+                    Err(QueryResultsSyntaxError::msg(
+                        "Unexpected JSON after the end of the bindings array",
+                    ))
+                }
             }
         }
     }
 }
 
-fn read_value<R: Read>(
-    reader: &mut FromReadJsonReader<R>,
-    number_of_recursive_calls: usize,
-) -> Result<Term, QueryResultsParseError> {
-    enum Type {
-        Uri,
-        BNode,
-        Literal,
-        #[cfg(feature = "rdf-star")]
-        Triple,
-    }
-    #[derive(Eq, PartialEq)]
-    enum State {
-        Type,
-        Value,
-        Lang,
-        Datatype,
-    }
+#[derive(Default)]
+struct JsonInnerTermReader {
+    state: JsonInnerTermReaderState,
+    term_type: Option<TermType>,
+    value: Option<String>,
+    lang: Option<String>,
+    datatype: Option<NamedNode>,
+    #[cfg(feature = "rdf-star")]
+    subject: Option<Term>,
+    #[cfg(feature = "rdf-star")]
+    predicate: Option<Term>,
+    #[cfg(feature = "rdf-star")]
+    object: Option<Term>,
+}
 
-    if number_of_recursive_calls == MAX_NUMBER_OF_NESTED_TRIPLES {
-        return Err(QueryResultsSyntaxError::msg(format!(
-            "Too many nested triples ({MAX_NUMBER_OF_NESTED_TRIPLES}). The parser fails here to avoid a stack overflow."
-        ))
-            .into());
-    }
-    let mut state = None;
-    let mut t = None;
-    let mut value = None;
-    let mut lang = None;
-    let mut datatype = None;
+#[derive(Default)]
+enum JsonInnerTermReaderState {
+    #[default]
+    Start,
+    Middle,
+    TermType,
+    Value,
+    Lang,
+    Datatype,
     #[cfg(feature = "rdf-star")]
-    let mut subject = None;
+    InValue,
     #[cfg(feature = "rdf-star")]
-    let mut predicate = None;
+    Subject(Box<JsonInnerTermReader>),
     #[cfg(feature = "rdf-star")]
-    let mut object = None;
-    if reader.read_next_event()? != JsonEvent::StartObject {
-        return Err(QueryResultsSyntaxError::msg("Term serializations should be an object").into());
-    }
-    loop {
-        #[allow(unsafe_code)]
-        // SAFETY: Borrow checker workaround https://github.com/rust-lang/rust/issues/70255
-        let next_event = unsafe {
-            let r: *mut FromReadJsonReader<R> = reader;
-            &mut *r
-        }
-        .read_next_event()?;
-        match next_event {
-            JsonEvent::ObjectKey(key) => match key.as_ref() {
-                "type" => state = Some(State::Type),
-                "value" => state = Some(State::Value),
-                "xml:lang" => state = Some(State::Lang),
-                "datatype" => state = Some(State::Datatype),
-                #[cfg(feature = "rdf-star")]
-                "subject" => subject = Some(read_value(reader, number_of_recursive_calls + 1)?),
-                #[cfg(feature = "rdf-star")]
-                "predicate" => predicate = Some(read_value(reader, number_of_recursive_calls + 1)?),
-                #[cfg(feature = "rdf-star")]
-                "object" => object = Some(read_value(reader, number_of_recursive_calls + 1)?),
-                _ => {
-                    return Err(QueryResultsSyntaxError::msg(format!(
-                        "Unexpected key in term serialization: '{key}'"
+    Predicate(Box<JsonInnerTermReader>),
+    #[cfg(feature = "rdf-star")]
+    Object(Box<JsonInnerTermReader>),
+}
+
+enum TermType {
+    Uri,
+    BNode,
+    Literal,
+    #[cfg(feature = "rdf-star")]
+    Triple,
+}
+
+impl JsonInnerTermReader {
+    fn read_event(
+        &mut self,
+        event: JsonEvent<'_>,
+    ) -> Result<Option<Term>, QueryResultsSyntaxError> {
+        match &mut self.state {
+            JsonInnerTermReaderState::Start => {
+                if event == JsonEvent::StartObject {
+                    self.state = JsonInnerTermReaderState::Middle;
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg(
+                        "RDF terms must be encoded using objects",
                     ))
-                    .into())
-                }
-            },
-            JsonEvent::StartObject => {
-                if state != Some(State::Value) {
-                    return Err(QueryResultsSyntaxError::msg(
-                        "Unexpected nested object in term serialization",
-                    )
-                    .into());
                 }
             }
-            JsonEvent::String(s) => match state {
-                Some(State::Type) => {
-                    match s.as_ref() {
-                        "uri" => t = Some(Type::Uri),
-                        "bnode" => t = Some(Type::BNode),
-                        "literal" | "typed-literal" => t = Some(Type::Literal),
-                        #[cfg(feature = "rdf-star")]
-                        "triple" => t = Some(Type::Triple),
+            JsonInnerTermReaderState::Middle => match event {
+                JsonEvent::ObjectKey(object_key) => {
+                    self.state = match object_key.as_ref() {
+                        "type" => JsonInnerTermReaderState::TermType,
+                        "value" => JsonInnerTermReaderState::Value,
+                        "datatype" => JsonInnerTermReaderState::Datatype,
+                        "xml:lang" => JsonInnerTermReaderState::Lang,
                         _ => {
                             return Err(QueryResultsSyntaxError::msg(format!(
-                                "Unexpected term type: '{s}'"
-                            ))
-                            .into())
+                                "Unsupported term key: {object_key}"
+                            )));
                         }
                     };
-                    state = None;
-                }
-                Some(State::Value) => {
-                    value = Some(s.into_owned());
-                    state = None;
+                    Ok(None)
                 }
-                Some(State::Lang) => {
-                    lang = Some(s.into_owned());
-                    state = None;
-                }
-                Some(State::Datatype) => {
-                    datatype = Some(NamedNode::new(s).map_err(|e| {
-                        QueryResultsSyntaxError::msg(format!("Invalid datatype IRI: {e}"))
-                    })?);
-                    state = None;
-                }
-                _ => (), // impossible
-            },
-            JsonEvent::EndObject => {
-                if let Some(s) = state {
-                    if s == State::Value {
-                        state = None; // End of triple
-                    } else {
-                        return Err(QueryResultsSyntaxError::msg(
-                            "Term description values should be string",
-                        )
-                        .into());
-                    }
-                } else {
-                    return match t {
+                JsonEvent::EndObject => {
+                    self.state = JsonInnerTermReaderState::Start;
+                    match self.term_type.take() {
                         None => Err(QueryResultsSyntaxError::msg(
                             "Term serialization should have a 'type' key",
-                        )
-                        .into()),
-                        Some(Type::Uri) => Ok(NamedNode::new(value.ok_or_else(|| {
-                            QueryResultsSyntaxError::msg(
-                                "uri serialization should have a 'value' key",
-                            )
-                        })?)
-                        .map_err(|e| {
-                            QueryResultsSyntaxError::msg(format!("Invalid uri value: {e}"))
-                        })?
-                        .into()),
-                        Some(Type::BNode) => Ok(BlankNode::new(value.ok_or_else(|| {
-                            QueryResultsSyntaxError::msg(
-                                "bnode serialization should have a 'value' key",
-                            )
-                        })?)
-                        .map_err(|e| {
-                            QueryResultsSyntaxError::msg(format!("Invalid bnode value: {e}"))
-                        })?
-                        .into()),
-                        Some(Type::Literal) => {
-                            let value = value.ok_or_else(|| {
+                        )),
+                        Some(TermType::Uri) => Ok(Some(
+                            NamedNode::new(self.value.take().ok_or_else(|| {
+                                QueryResultsSyntaxError::msg(
+                                    "uri serialization should have a 'value' key",
+                                )
+                            })?)
+                            .map_err(|e| {
+                                QueryResultsSyntaxError::msg(format!("Invalid uri value: {e}"))
+                            })?
+                            .into(),
+                        )),
+                        Some(TermType::BNode) => Ok(Some(
+                            BlankNode::new(self.value.take().ok_or_else(|| {
+                                QueryResultsSyntaxError::msg(
+                                    "bnode serialization should have a 'value' key",
+                                )
+                            })?)
+                            .map_err(|e| {
+                                QueryResultsSyntaxError::msg(format!("Invalid bnode value: {e}"))
+                            })?
+                            .into(),
+                        )),
+                        Some(TermType::Literal) => {
+                            let value = self.value.take().ok_or_else(|| {
                                 QueryResultsSyntaxError::msg(
                                     "literal serialization should have a 'value' key",
                                 )
                             })?;
-                            Ok(match lang {
-                                Some(lang) => {
-                                    if let Some(datatype) = datatype {
-                                        if datatype.as_ref() != rdf::LANG_STRING {
-                                            return Err(QueryResultsSyntaxError::msg(format!(
-                                                "xml:lang value '{lang}' provided with the datatype {datatype}"
-                                            )).into())
+                            Ok(Some(match self.lang.take() {
+                                    Some(lang) => {
+                                        if let Some(datatype) = &self.datatype {
+                                            if datatype.as_ref() != rdf::LANG_STRING {
+                                                return Err(QueryResultsSyntaxError::msg(format!(
+                                                    "xml:lang value '{lang}' provided with the datatype {datatype}"
+                                                )));
+                                            }
                                         }
+                                        Literal::new_language_tagged_literal(value, &*lang)
+                                            .map_err(|e| {
+                                                QueryResultsSyntaxError::msg(format!(
+                                                    "Invalid xml:lang value '{lang}': {e}"
+                                                ))
+                                            })?
                                     }
-                                    Literal::new_language_tagged_literal(value, &*lang).map_err(|e| {
-                                        QueryResultsSyntaxError::msg(format!("Invalid xml:lang value '{lang}': {e}"))
-                                    })?
-                                }
-                                None => if let Some(datatype) = datatype {
-                                    Literal::new_typed_literal(value, datatype)
-                                } else {
-                                    Literal::new_simple_literal(value)
-                                }
-                            }
-                                .into())
+                                    None => {
+                                        if let Some(datatype) = self.datatype.take() {
+                                            Literal::new_typed_literal(value, datatype)
+                                        } else {
+                                            Literal::new_simple_literal(value)
+                                        }
+                                    }
+                                }.into()))
                         }
                         #[cfg(feature = "rdf-star")]
-                        Some(Type::Triple) => Ok(Triple::new(
-                            match subject.ok_or_else(|| {
-                                QueryResultsSyntaxError::msg(
-                                    "triple serialization should have a 'subject' key",
-                                )
-                            })? {
-                                Term::NamedNode(subject) => subject.into(),
-                                Term::BlankNode(subject) => subject.into(),
-                                Term::Triple(subject) => Subject::Triple(subject),
-                                Term::Literal(_) => {
-                                    return Err(QueryResultsSyntaxError::msg(
-                                        "The 'subject' value should not be a literal",
+                        Some(TermType::Triple) => Ok(Some(
+                            Triple::new(
+                                match self.subject.take().ok_or_else(|| {
+                                    QueryResultsSyntaxError::msg(
+                                        "triple serialization should have a 'subject' key",
                                     )
-                                    .into())
-                                }
-                            },
-                            match predicate.ok_or_else(|| {
-                                QueryResultsSyntaxError::msg(
-                                    "triple serialization should have a 'predicate' key",
-                                )
-                            })? {
-                                Term::NamedNode(predicate) => predicate,
-                                _ => {
-                                    return Err(QueryResultsSyntaxError::msg(
-                                        "The 'predicate' value should be a uri",
+                                })? {
+                                    Term::NamedNode(subject) => subject.into(),
+                                    Term::BlankNode(subject) => subject.into(),
+                                    Term::Triple(subject) => Subject::Triple(subject),
+                                    Term::Literal(_) => {
+                                        return Err(QueryResultsSyntaxError::msg(
+                                            "The 'subject' value should not be a literal",
+                                        ));
+                                    }
+                                },
+                                match self.predicate.take().ok_or_else(|| {
+                                    QueryResultsSyntaxError::msg(
+                                        "triple serialization should have a 'predicate' key",
                                     )
-                                    .into())
-                                }
-                            },
-                            object.ok_or_else(|| {
-                                QueryResultsSyntaxError::msg(
-                                    "triple serialization should have a 'object' key",
-                                )
-                            })?,
-                        )
-                        .into()),
-                    };
-                }
-            }
-            _ => return Err(QueryResultsSyntaxError::msg("Invalid term serialization").into()),
-        }
-    }
-}
-
-fn read_head<R: Read>(
-    reader: &mut FromReadJsonReader<R>,
-) -> Result<Vec<Variable>, QueryResultsParseError> {
-    if reader.read_next_event()? != JsonEvent::StartObject {
-        return Err(QueryResultsSyntaxError::msg("head should be an object").into());
-    }
-    let mut variables = Vec::new();
-    loop {
-        match reader.read_next_event()? {
-            JsonEvent::ObjectKey(key) => match key.as_ref() {
-                "vars" => {
-                    if reader.read_next_event()? != JsonEvent::StartArray {
-                        return Err(QueryResultsSyntaxError::msg(
-                            "Variable list should be an array",
-                        )
-                        .into());
+                                })? {
+                                    Term::NamedNode(predicate) => predicate,
+                                    _ => {
+                                        return Err(QueryResultsSyntaxError::msg(
+                                            "The 'predicate' value should be a uri",
+                                        ));
+                                    }
+                                },
+                                self.object.take().ok_or_else(|| {
+                                    QueryResultsSyntaxError::msg(
+                                        "triple serialization should have a 'object' key",
+                                    )
+                                })?,
+                            )
+                            .into(),
+                        )),
                     }
-                    loop {
-                        match reader.read_next_event()? {
-                            JsonEvent::String(s) => {
-                                let new_var = Variable::new(s.as_ref()).map_err(|e| {
-                                    QueryResultsSyntaxError::msg(format!(
-                                        "Invalid variable declaration '{s}': {e}"
-                                    ))
-                                })?;
-                                if variables.contains(&new_var) {
-                                    return Err(QueryResultsSyntaxError::msg(format!(
-                                        "The variable {new_var} is declared twice"
-                                    ))
-                                    .into());
-                                }
-                                variables.push(new_var);
-                            }
-                            JsonEvent::EndArray => break,
-                            _ => {
-                                return Err(QueryResultsSyntaxError::msg(
-                                    "Variable names should be strings",
-                                )
-                                .into())
-                            }
+                }
+                _ => unreachable!(),
+            },
+            JsonInnerTermReaderState::TermType => {
+                self.state = JsonInnerTermReaderState::Middle;
+                if let JsonEvent::String(value) = event {
+                    match value.as_ref() {
+                        "uri" => {
+                            self.term_type = Some(TermType::Uri);
+                            Ok(None)
+                        }
+                        "bnode" => {
+                            self.term_type = Some(TermType::BNode);
+                            Ok(None)
                         }
+                        "literal" | "typed-literal" => {
+                            self.term_type = Some(TermType::Literal);
+                            Ok(None)
+                        }
+                        #[cfg(feature = "rdf-star")]
+                        "triple" => {
+                            self.term_type = Some(TermType::Triple);
+                            Ok(None)
+                        }
+                        _ => Err(QueryResultsSyntaxError::msg(format!(
+                            "Unexpected term type: '{value}'"
+                        ))),
                     }
+                } else {
+                    Err(QueryResultsSyntaxError::msg("Term type must be a string"))
                 }
-                "link" => {
-                    if reader.read_next_event()? != JsonEvent::StartArray {
-                        return Err(QueryResultsSyntaxError::msg(
-                            "Variable list should be an array",
-                        )
-                        .into());
-                    }
-                    loop {
-                        match reader.read_next_event()? {
-                            JsonEvent::String(_) => (),
-                            JsonEvent::EndArray => break,
-                            _ => {
-                                return Err(QueryResultsSyntaxError::msg(
-                                    "Link names should be strings",
-                                )
-                                .into())
-                            }
+            }
+            JsonInnerTermReaderState::Value => match event {
+                JsonEvent::String(value) => {
+                    self.value = Some(value.into_owned());
+                    self.state = JsonInnerTermReaderState::Middle;
+                    Ok(None)
+                }
+                #[cfg(feature = "rdf-star")]
+                JsonEvent::StartObject => {
+                    self.state = JsonInnerTermReaderState::InValue;
+                    Ok(None)
+                }
+                _ => {
+                    self.state = JsonInnerTermReaderState::Middle;
+
+                    Err(QueryResultsSyntaxError::msg("Term value must be a string"))
+                }
+            },
+            JsonInnerTermReaderState::Lang => {
+                let result = if let JsonEvent::String(value) = event {
+                    self.lang = Some(value.into_owned());
+                    Ok(None)
+                } else {
+                    Err(QueryResultsSyntaxError::msg("Term lang must be strings"))
+                };
+                self.state = JsonInnerTermReaderState::Middle;
+
+                result
+            }
+            JsonInnerTermReaderState::Datatype => {
+                let result = if let JsonEvent::String(value) = event {
+                    match NamedNode::new(value) {
+                        Ok(datatype) => {
+                            self.datatype = Some(datatype);
+                            Ok(None)
                         }
+                        Err(e) => Err(QueryResultsSyntaxError::msg(format!(
+                            "Invalid datatype: {e}"
+                        ))),
                     }
+                } else {
+                    Err(QueryResultsSyntaxError::msg("Term lang must be strings"))
+                };
+                self.state = JsonInnerTermReaderState::Middle;
+
+                result
+            }
+            #[cfg(feature = "rdf-star")]
+            JsonInnerTermReaderState::InValue => match event {
+                JsonEvent::ObjectKey(object_key) => {
+                    self.state = match object_key.as_ref() {
+                        "subject" => JsonInnerTermReaderState::Subject(Box::default()),
+                        "predicate" => JsonInnerTermReaderState::Predicate(Box::default()),
+                        "object" => JsonInnerTermReaderState::Object(Box::default()),
+                        _ => {
+                            return Err(QueryResultsSyntaxError::msg(format!(
+                                "Unsupported value key: {object_key}"
+                            )));
+                        }
+                    };
+                    Ok(None)
+                }
+                JsonEvent::EndObject => {
+                    self.state = JsonInnerTermReaderState::Middle;
+                    Ok(None)
                 }
-                _ => ignore_value(reader)?,
+                _ => unreachable!(),
             },
-            JsonEvent::EndObject => return Ok(variables),
-            _ => return Err(QueryResultsSyntaxError::msg("Invalid head serialization").into()),
-        }
-    }
-}
-
-fn ignore_value<R: Read>(reader: &mut FromReadJsonReader<R>) -> Result<(), QueryResultsParseError> {
-    let mut nesting = 0;
-    loop {
-        match reader.read_next_event()? {
-            JsonEvent::Boolean(_)
-            | JsonEvent::Null
-            | JsonEvent::Number(_)
-            | JsonEvent::String(_) => {
-                if nesting == 0 {
-                    return Ok(());
+            #[cfg(feature = "rdf-star")]
+            JsonInnerTermReaderState::Subject(ref mut inner_state) => {
+                if let Some(term) = inner_state.read_event(event)? {
+                    self.state = JsonInnerTermReaderState::InValue;
+                    self.subject = Some(term);
                 }
+                Ok(None)
             }
-            JsonEvent::ObjectKey(_) => (),
-            JsonEvent::StartArray | JsonEvent::StartObject => nesting += 1,
-            JsonEvent::EndArray | JsonEvent::EndObject => {
-                nesting -= 1;
-                if nesting == 0 {
-                    return Ok(());
+            #[cfg(feature = "rdf-star")]
+            JsonInnerTermReaderState::Predicate(ref mut inner_state) => {
+                if let Some(term) = inner_state.read_event(event)? {
+                    self.state = JsonInnerTermReaderState::InValue;
+                    self.predicate = Some(term);
                 }
+                Ok(None)
             }
-            JsonEvent::Eof => {
-                return Err(QueryResultsSyntaxError::msg("Unexpected end of file").into())
+            #[cfg(feature = "rdf-star")]
+            JsonInnerTermReaderState::Object(ref mut inner_state) => {
+                if let Some(term) = inner_state.read_event(event)? {
+                    self.state = JsonInnerTermReaderState::InValue;
+                    self.object = Some(term);
+                }
+                Ok(None)
             }
         }
     }
 }
+
+pub struct JsonBufferedSolutionsIterator {
+    mapping: BTreeMap<String, usize>,
+    bindings: std::vec::IntoIter<(Vec<String>, Vec<Term>)>,
+}
+
+impl JsonBufferedSolutionsIterator {
+    fn next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
+        let Some((variables, values)) = self.bindings.next() else {
+            return Ok(None);
+        };
+        let mut new_bindings = vec![None; self.mapping.len()];
+        for (variable, value) in variables.into_iter().zip(values) {
+            let k = *self.mapping.get(&variable).ok_or_else(|| {
+                QueryResultsSyntaxError::msg(format!(
+                    "The variable {variable} has not been defined in the header"
+                ))
+            })?;
+            new_bindings[k] = Some(value);
+        }
+        Ok(Some(new_bindings))
+    }
+}
diff --git a/lib/sparesults/src/lib.rs b/lib/sparesults/src/lib.rs
index ea3135c4..bfe45051 100644
--- a/lib/sparesults/src/lib.rs
+++ b/lib/sparesults/src/lib.rs
@@ -16,5 +16,9 @@ mod xml;
 pub use crate::error::{QueryResultsParseError, QueryResultsSyntaxError, TextPosition};
 pub use crate::format::QueryResultsFormat;
 pub use crate::parser::{FromReadQueryResultsReader, FromReadSolutionsReader, QueryResultsParser};
+#[cfg(feature = "async-tokio")]
+pub use crate::parser::{FromTokioAsyncReadQueryResultsReader, FromTokioAsyncReadSolutionsReader};
+#[cfg(feature = "async-tokio")]
+pub use crate::serializer::ToTokioAsyncWriteSolutionsWriter;
 pub use crate::serializer::{QueryResultsSerializer, ToWriteSolutionsWriter};
 pub use crate::solution::QuerySolution;
diff --git a/lib/sparesults/src/parser.rs b/lib/sparesults/src/parser.rs
index f95d9355..602a5e61 100644
--- a/lib/sparesults/src/parser.rs
+++ b/lib/sparesults/src/parser.rs
@@ -1,12 +1,18 @@
 use crate::csv::{TsvQueryResultsReader, TsvSolutionsReader};
 use crate::error::{QueryResultsParseError, QueryResultsSyntaxError};
 use crate::format::QueryResultsFormat;
-use crate::json::{JsonQueryResultsReader, JsonSolutionsReader};
+use crate::json::{FromReadJsonQueryResultsReader, FromReadJsonSolutionsReader};
+#[cfg(feature = "async-tokio")]
+use crate::json::{
+    FromTokioAsyncReadJsonQueryResultsReader, FromTokioAsyncReadJsonSolutionsReader,
+};
 use crate::solution::QuerySolution;
 use crate::xml::{XmlQueryResultsReader, XmlSolutionsReader};
 use oxrdf::Variable;
 use std::io::Read;
 use std::sync::Arc;
+#[cfg(feature = "async-tokio")]
+use tokio::io::AsyncRead;
 
 /// Parsers for [SPARQL query](https://www.w3.org/TR/sparql11-query/) results serialization formats.
 ///
@@ -45,24 +51,24 @@ impl QueryResultsParser {
         Self { format }
     }
 
-    /// Reads a result file.
+    /// Reads a result file from a [`Read`] implementation.
     ///
-    /// Reads are buffered.
+    /// Reads are automatically buffered.
     ///
     /// Example in XML (the API is the same for JSON and TSV):
     /// ```
     /// use sparesults::{QueryResultsFormat, QueryResultsParser, FromReadQueryResultsReader};
     /// use oxrdf::{Literal, Variable};
     ///
-    /// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml);
+    /// let xml_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml);
     ///
     /// // boolean
-    /// if let FromReadQueryResultsReader::Boolean(v) = json_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head/><boolean>true</boolean></sparql>"#.as_slice())? {
+    /// if let FromReadQueryResultsReader::Boolean(v) = xml_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head/><boolean>true</boolean></sparql>"#.as_slice())? {
     ///     assert_eq!(v, true);
     /// }
     ///
     /// // solutions
-    /// if let FromReadQueryResultsReader::Solutions(solutions) = json_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head><variable name="foo"/><variable name="bar"/></head><results><result><binding name="foo"><literal>test</literal></binding></result></results></sparql>"#.as_slice())? {
+    /// if let FromReadQueryResultsReader::Solutions(solutions) = xml_parser.parse_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head><variable name="foo"/><variable name="bar"/></head><results><result><binding name="foo"><literal>test</literal></binding></result></results></sparql>"#.as_slice())? {
     ///     assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]);
     ///     for solution in solutions {
     ///         assert_eq!(solution?.iter().collect::<Vec<_>>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]);
@@ -82,17 +88,17 @@ impl QueryResultsParser {
                     variables,
                 } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
                     variables: variables.into(),
-                    solutions: SolutionsReaderKind::Xml(solutions),
+                    solutions: FromReadSolutionsReaderKind::Xml(solutions),
                 }),
             },
-            QueryResultsFormat::Json => match JsonQueryResultsReader::read(reader)? {
-                JsonQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r),
-                JsonQueryResultsReader::Solutions {
+            QueryResultsFormat::Json => match FromReadJsonQueryResultsReader::read(reader)? {
+                FromReadJsonQueryResultsReader::Boolean(r) => FromReadQueryResultsReader::Boolean(r),
+                FromReadJsonQueryResultsReader::Solutions {
                     solutions,
                     variables,
                 } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
                     variables: variables.into(),
-                    solutions: SolutionsReaderKind::Json(solutions),
+                    solutions: FromReadSolutionsReaderKind::Json(solutions),
                 }),
             },
             QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()),
@@ -103,7 +109,7 @@ impl QueryResultsParser {
                     variables,
                 } => FromReadQueryResultsReader::Solutions(FromReadSolutionsReader {
                     variables: variables.into(),
-                    solutions: SolutionsReaderKind::Tsv(solutions),
+                    solutions: FromReadSolutionsReaderKind::Tsv(solutions),
                 }),
             },
         })
@@ -116,6 +122,56 @@ impl QueryResultsParser {
     ) -> Result<FromReadQueryResultsReader<R>, QueryResultsParseError> {
         self.parse_read(reader)
     }
+
+    /// Reads a result file from a Tokio [`AsyncRead`] implementation.
+    ///
+    /// Reads are automatically buffered.
+    ///
+    /// Example in XML (the API is the same for JSON and TSV):
+    /// ```no_run
+    /// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader};
+    /// use oxrdf::{Literal, Variable};
+    ///
+    /// # #[tokio::main(flavor = "current_thread")]
+    /// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
+    /// let xml_parser = QueryResultsParser::from_format(QueryResultsFormat::Xml);
+    ///
+    /// // boolean
+    /// if let FromTokioAsyncReadQueryResultsReader::Boolean(v) = xml_parser.parse_tokio_async_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head/><boolean>true</boolean></sparql>"#.as_slice()).await? {
+    ///     assert_eq!(v, true);
+    /// }
+    ///
+    /// // solutions
+    /// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = xml_parser.parse_tokio_async_read(br#"<sparql xmlns="http://www.w3.org/2005/sparql-results#"><head><variable name="foo"/><variable name="bar"/></head><results><result><binding name="foo"><literal>test</literal></binding></result></results></sparql>"#.as_slice()).await? {
+    ///     assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]);
+    ///     while let Some(solution) = solutions.next().await {
+    ///         assert_eq!(solution?.iter().collect::<Vec<_>>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]);
+    ///     }
+    /// }
+    /// # Ok(())
+    /// # }
+    /// ```
+    #[cfg(feature = "async-tokio")]
+    pub async fn parse_tokio_async_read<R: AsyncRead + Unpin>(
+        &self,
+        reader: R,
+    ) -> Result<FromTokioAsyncReadQueryResultsReader<R>, QueryResultsParseError> {
+        Ok(match self.format {
+            QueryResultsFormat::Xml => return Err(QueryResultsSyntaxError::msg("The XML query results parser does not support Tokio AsyncRead yet").into()),
+            QueryResultsFormat::Json => match FromTokioAsyncReadJsonQueryResultsReader::read(reader).await? {
+                FromTokioAsyncReadJsonQueryResultsReader::Boolean(r) => FromTokioAsyncReadQueryResultsReader::Boolean(r),
+                FromTokioAsyncReadJsonQueryResultsReader::Solutions {
+                    solutions,
+                    variables,
+                } => FromTokioAsyncReadQueryResultsReader::Solutions(FromTokioAsyncReadSolutionsReader {
+                    variables: variables.into(),
+                    solutions: FromTokioAsyncReadSolutionsReaderKind::Json(solutions),
+                }),
+            },
+            QueryResultsFormat::Csv => return Err(QueryResultsSyntaxError::msg("CSV SPARQL results syntax is lossy and can't be parsed to a proper RDF representation").into()),
+            QueryResultsFormat::Tsv => return Err(QueryResultsSyntaxError::msg("The TSV query results parser does not support Tokio AsyncRead yet").into()),
+        })
+    }
 }
 
 impl From<QueryResultsFormat> for QueryResultsParser {
@@ -133,16 +189,16 @@ impl From<QueryResultsFormat> for QueryResultsParser {
 /// use oxrdf::{Literal, Variable};
 /// use sparesults::{FromReadQueryResultsReader, QueryResultsFormat, QueryResultsParser};
 ///
-/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
+/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
 ///
 /// // boolean
-/// if let FromReadQueryResultsReader::Boolean(v) = json_parser.parse_read(b"true".as_slice())? {
+/// if let FromReadQueryResultsReader::Boolean(v) = tsv_parser.parse_read(b"true".as_slice())? {
 ///     assert_eq!(v, true);
 /// }
 ///
 /// // solutions
 /// if let FromReadQueryResultsReader::Solutions(solutions) =
-///     json_parser.parse_read(b"?foo\t?bar\n\"test\"\t".as_slice())?
+///     tsv_parser.parse_read(b"?foo\t?bar\n\"test\"\t".as_slice())?
 /// {
 ///     assert_eq!(
 ///         solutions.variables(),
@@ -188,12 +244,12 @@ pub enum FromReadQueryResultsReader<R: Read> {
 /// ```
 pub struct FromReadSolutionsReader<R: Read> {
     variables: Arc<[Variable]>,
-    solutions: SolutionsReaderKind<R>,
+    solutions: FromReadSolutionsReaderKind<R>,
 }
 
-enum SolutionsReaderKind<R: Read> {
+enum FromReadSolutionsReaderKind<R: Read> {
     Xml(XmlSolutionsReader<R>),
-    Json(JsonSolutionsReader<R>),
+    Json(FromReadJsonSolutionsReader<R>),
     Tsv(TsvSolutionsReader<R>),
 }
 
@@ -205,9 +261,9 @@ impl<R: Read> FromReadSolutionsReader<R> {
     /// use oxrdf::Variable;
     /// use sparesults::{FromReadQueryResultsReader, QueryResultsFormat, QueryResultsParser};
     ///
-    /// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
+    /// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
     /// if let FromReadQueryResultsReader::Solutions(solutions) =
-    ///     json_parser.parse_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())?
+    ///     tsv_parser.parse_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())?
     /// {
     ///     assert_eq!(
     ///         solutions.variables(),
@@ -231,9 +287,141 @@ impl<R: Read> Iterator for FromReadSolutionsReader<R> {
     fn next(&mut self) -> Option<Self::Item> {
         Some(
             match &mut self.solutions {
-                SolutionsReaderKind::Xml(reader) => reader.read_next(),
-                SolutionsReaderKind::Json(reader) => reader.read_next(),
-                SolutionsReaderKind::Tsv(reader) => reader.read_next(),
+                FromReadSolutionsReaderKind::Xml(reader) => reader.read_next(),
+                FromReadSolutionsReaderKind::Json(reader) => reader.read_next(),
+                FromReadSolutionsReaderKind::Tsv(reader) => reader.read_next(),
+            }
+            .transpose()?
+            .map(|values| (Arc::clone(&self.variables), values).into()),
+        )
+    }
+}
+
+/// The reader for a given read of a results file.
+///
+/// It is either a read boolean ([`bool`]) or a streaming reader of a set of solutions ([`FromReadSolutionsReader`]).
+///
+/// Example in TSV (the API is the same for JSON and XML):
+/// ```no_run
+/// use oxrdf::{Literal, Variable};
+/// use sparesults::{
+///     FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser,
+/// };
+///
+/// # #[tokio::main(flavor = "current_thread")]
+/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
+/// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
+///
+/// // boolean
+/// if let FromTokioAsyncReadQueryResultsReader::Boolean(v) = tsv_parser
+///     .parse_tokio_async_read(b"true".as_slice())
+///     .await?
+/// {
+///     assert_eq!(v, true);
+/// }
+///
+/// // solutions
+/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = tsv_parser
+///     .parse_tokio_async_read(b"?foo\t?bar\n\"test\"\t".as_slice())
+///     .await?
+/// {
+///     assert_eq!(
+///         solutions.variables(),
+///         &[
+///             Variable::new_unchecked("foo"),
+///             Variable::new_unchecked("bar")
+///         ]
+///     );
+///     while let Some(solution) = solutions.next().await {
+///         assert_eq!(
+///             solution?.iter().collect::<Vec<_>>(),
+///             vec![(
+///                 &Variable::new_unchecked("foo"),
+///                 &Literal::from("test").into()
+///             )]
+///         );
+///     }
+/// }
+/// # Ok(())
+/// # }
+/// ```
+#[cfg(feature = "async-tokio")]
+pub enum FromTokioAsyncReadQueryResultsReader<R: AsyncRead + Unpin> {
+    Solutions(FromTokioAsyncReadSolutionsReader<R>),
+    Boolean(bool),
+}
+
+/// A streaming reader of a set of [`QuerySolution`] solutions.
+///
+/// It implements the [`Iterator`] API to iterate over the solutions.
+///
+/// Example in JSON (the API is the same for XML and TSV):
+/// ```
+/// use sparesults::{QueryResultsFormat, QueryResultsParser, FromTokioAsyncReadQueryResultsReader};
+/// use oxrdf::{Literal, Variable};
+///
+/// # #[tokio::main(flavor = "current_thread")]
+/// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
+/// let json_parser = QueryResultsParser::from_format(QueryResultsFormat::Json);
+/// if let FromTokioAsyncReadQueryResultsReader::Solutions(mut solutions) = json_parser.parse_tokio_async_read(br#"{"head":{"vars":["foo","bar"]},"results":{"bindings":[{"foo":{"type":"literal","value":"test"}}]}}"#.as_slice()).await? {
+///     assert_eq!(solutions.variables(), &[Variable::new_unchecked("foo"), Variable::new_unchecked("bar")]);
+///     while let Some(solution) = solutions.next().await {
+///         assert_eq!(solution?.iter().collect::<Vec<_>>(), vec![(&Variable::new_unchecked("foo"), &Literal::from("test").into())]);
+///     }
+/// }
+/// # Ok(())
+/// # }
+/// ```
+#[cfg(feature = "async-tokio")]
+pub struct FromTokioAsyncReadSolutionsReader<R: AsyncRead + Unpin> {
+    variables: Arc<[Variable]>,
+    solutions: FromTokioAsyncReadSolutionsReaderKind<R>,
+}
+
+#[cfg(feature = "async-tokio")]
+enum FromTokioAsyncReadSolutionsReaderKind<R: AsyncRead + Unpin> {
+    Json(FromTokioAsyncReadJsonSolutionsReader<R>),
+}
+
+#[cfg(feature = "async-tokio")]
+impl<R: AsyncRead + Unpin> FromTokioAsyncReadSolutionsReader<R> {
+    /// Ordered list of the declared variables at the beginning of the results.
+    ///
+    /// Example in TSV (the API is the same for JSON and XML):
+    /// ```no_run
+    /// use oxrdf::Variable;
+    /// use sparesults::{
+    ///     FromTokioAsyncReadQueryResultsReader, QueryResultsFormat, QueryResultsParser,
+    /// };
+    ///
+    /// # #[tokio::main(flavor = "current_thread")]
+    /// # async fn main() -> Result<(), sparesults::QueryResultsParseError> {
+    /// let tsv_parser = QueryResultsParser::from_format(QueryResultsFormat::Tsv);
+    /// if let FromTokioAsyncReadQueryResultsReader::Solutions(solutions) = tsv_parser
+    ///     .parse_tokio_async_read(b"?foo\t?bar\n\"ex1\"\t\"ex2\"".as_slice())
+    ///     .await?
+    /// {
+    ///     assert_eq!(
+    ///         solutions.variables(),
+    ///         &[
+    ///             Variable::new_unchecked("foo"),
+    ///             Variable::new_unchecked("bar")
+    ///         ]
+    ///     );
+    /// }
+    /// # Ok(())
+    /// # }
+    /// ```
+    #[inline]
+    pub fn variables(&self) -> &[Variable] {
+        &self.variables
+    }
+
+    /// Reads the next solution or returns `None` if the file is finished.
+    pub async fn next(&mut self) -> Option<Result<QuerySolution, QueryResultsParseError>> {
+        Some(
+            match &mut self.solutions {
+                FromTokioAsyncReadSolutionsReaderKind::Json(reader) => reader.read_next().await,
             }
             .transpose()?
             .map(|values| (Arc::clone(&self.variables), values).into()),