Python: makes type stubs and code valid according to Mypy strict mode

pull/315/head
Tpt 2 years ago committed by Thomas Tanon
parent 45c541edad
commit 3712142e6f
  1. 2
      .github/workflows/tests.yml
  2. 248
      python/generate_stubs.py
  3. 5
      python/src/io.rs
  4. 2
      python/src/model.rs
  5. 17
      python/src/store.rs
  6. 14
      python/tests/test_io.py
  7. 42
      python/tests/test_model.py
  8. 77
      python/tests/test_store.py

@ -114,5 +114,5 @@ jobs:
working-directory: ./python
- run: python -m mypy.stubtest pyoxigraph --allowlist=mypy_allowlist.txt
working-directory: ./python
- run: python -m mypy generate_stubs.py tests
- run: python -m mypy generate_stubs.py tests --strict
working-directory: ./python

@ -5,48 +5,84 @@ import inspect
import logging
import re
import subprocess
from functools import reduce
from typing import Set, List, Mapping, Any
from typing import Set, List, Mapping, Any, Tuple, Union, Optional, Dict
def _path_to_type(*elements: str) -> ast.AST:
base: ast.AST = ast.Name(id=elements[0], ctx=AST_LOAD)
for e in elements[1:]:
base = ast.Attribute(value=base, attr=e, ctx=AST_LOAD)
return base
AST_LOAD = ast.Load()
AST_ELLIPSIS = ast.Ellipsis()
AST_STORE = ast.Store()
AST_TYPING_ANY = ast.Attribute(
value=ast.Name(id="typing", ctx=AST_LOAD), attr="Any", ctx=AST_LOAD
)
AST_TYPING_ANY = _path_to_type("typing", "Any")
GENERICS = {
"iter": ast.Attribute(
value=ast.Name(id="typing", ctx=AST_LOAD), attr="Iterator", ctx=AST_LOAD
),
"list": ast.Attribute(
value=ast.Name(id="typing", ctx=AST_LOAD), attr="List", ctx=AST_LOAD
),
"iterable": _path_to_type("typing", "Iterable"),
"iterator": _path_to_type("typing", "Iterator"),
"list": _path_to_type("typing", "List"),
}
OBJECT_MEMBERS = dict(inspect.getmembers(object))
ATTRIBUTES_BLACKLIST = {
"__class__",
"__dir__",
"__doc__",
"__init_subclass__",
"__module__",
"__new__",
"__subclasshook__",
BUILTINS: Dict[str, Union[None, Tuple[List[ast.AST], ast.AST]]] = {
"__annotations__": None,
"__bool__": ([], _path_to_type("bool")),
"__bytes__": ([], _path_to_type("bytes")),
"__class__": None,
"__contains__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__del__": None,
"__delattr__": ([_path_to_type("str")], _path_to_type("None")),
"__delitem__": ([AST_TYPING_ANY], AST_TYPING_ANY),
"__dict__": None,
"__dir__": None,
"__doc__": None,
"__eq__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__format__": ([_path_to_type("str")], _path_to_type("str")),
"__ge__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__getattribute__": ([_path_to_type("str")], AST_TYPING_ANY),
"__getitem__": ([AST_TYPING_ANY], AST_TYPING_ANY),
"__gt__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__hash__": ([], _path_to_type("int")),
"__init__": ([], _path_to_type("None")),
"__init_subclass__": None,
"__iter__": ([], AST_TYPING_ANY),
"__le__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__len__": ([], _path_to_type("int")),
"__lt__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__module__": None,
"__ne__": ([AST_TYPING_ANY], _path_to_type("bool")),
"__new__": None,
"__next__": ([], AST_TYPING_ANY),
"__reduce__": None,
"__reduce_ex__": None,
"__repr__": ([], _path_to_type("str")),
"__setattr__": ([_path_to_type("str"), AST_TYPING_ANY], _path_to_type("None")),
"__setitem__": ([AST_TYPING_ANY, AST_TYPING_ANY], AST_TYPING_ANY),
"__sizeof__": None,
"__str__": ([], _path_to_type("str")),
"__subclasshook__": None,
}
def module_stubs(module) -> ast.Module:
def module_stubs(module: Any) -> ast.Module:
types_to_import = {"typing"}
classes = []
functions = []
for (member_name, member_value) in inspect.getmembers(module):
element_path = [module.__name__, member_name]
if member_name.startswith("__"):
pass
elif inspect.isclass(member_value):
classes.append(class_stubs(member_name, member_value, types_to_import))
classes.append(
class_stubs(member_name, member_value, element_path, types_to_import)
)
elif inspect.isbuiltin(member_value):
functions.append(function_stub(member_name, member_value, types_to_import))
functions.append(
function_stub(member_name, member_value, element_path, types_to_import)
)
else:
logging.warning(f"Unsupported root construction {member_name}")
return ast.Module(
@ -57,36 +93,48 @@ def module_stubs(module) -> ast.Module:
)
def class_stubs(cls_name: str, cls_def, types_to_import: Set[str]) -> ast.ClassDef:
def class_stubs(
cls_name: str, cls_def: Any, element_path: List[str], types_to_import: Set[str]
) -> ast.ClassDef:
attributes: List[ast.AST] = []
methods: List[ast.AST] = []
magic_methods: List[ast.AST] = []
for (member_name, member_value) in inspect.getmembers(cls_def):
current_element_path = element_path + [member_name]
if member_name == "__init__":
try:
inspect.signature(cls_def) # we check it actually exists
methods = [
function_stub(member_name, cls_def, types_to_import)
function_stub(
member_name, cls_def, current_element_path, types_to_import
)
] + methods
except ValueError as e:
if "no signature found" not in str(e):
raise ValueError(
f"Error while parsing signature of {cls_name}.__init__: {e}"
)
elif member_name in ATTRIBUTES_BLACKLIST or member_value == OBJECT_MEMBERS.get(
member_name
elif (
member_value == OBJECT_MEMBERS.get(member_name)
or BUILTINS.get(member_name, ()) is None
):
pass
elif inspect.isdatadescriptor(member_value):
attributes.extend(
data_descriptor_stub(member_name, member_value, types_to_import)
data_descriptor_stub(
member_name, member_value, current_element_path, types_to_import
)
)
elif inspect.isroutine(member_value):
(magic_methods if member_name.startswith("__") else methods).append(
function_stub(member_name, member_value, types_to_import)
function_stub(
member_name, member_value, current_element_path, types_to_import
)
)
else:
logging.warning(f"Unsupported member {member_name} of class {cls_name}")
logging.warning(
f"Unsupported member {member_name} of class {'.'.join(element_path)}"
)
doc = inspect.getdoc(cls_def)
return ast.ClassDef(
@ -100,28 +148,29 @@ def class_stubs(cls_name: str, cls_def, types_to_import: Set[str]) -> ast.ClassD
+ magic_methods
)
or [AST_ELLIPSIS],
decorator_list=[
ast.Attribute(
value=ast.Name(id="typing", ctx=AST_LOAD), attr="final", ctx=AST_LOAD
)
],
decorator_list=[_path_to_type("typing", "final")],
)
def data_descriptor_stub(
data_desc_name: str, data_desc_def, types_to_import: Set[str]
) -> tuple:
data_desc_name: str,
data_desc_def: Any,
element_path: List[str],
types_to_import: Set[str],
) -> Union[Tuple[ast.AnnAssign, ast.Expr], Tuple[ast.AnnAssign]]:
annotation = None
doc_comment = None
doc = inspect.getdoc(data_desc_def)
if doc is not None:
annotation = returns_stub(doc, types_to_import)
m = re.findall(r":return: *(.*) *\n", doc)
annotation = returns_stub(data_desc_name, doc, element_path, types_to_import)
m = re.findall(r"^ *:return: *(.*) *$", doc, re.MULTILINE)
if len(m) == 1:
doc_comment = m[0]
elif len(m) > 1:
raise ValueError("Multiple return annotations found with :return:")
raise ValueError(
f"Multiple return annotations found with :return: in {'.'.join(element_path)} documentation"
)
assign = ast.AnnAssign(
target=ast.Name(id=data_desc_name, ctx=AST_STORE),
@ -131,23 +180,33 @@ def data_descriptor_stub(
return (assign, build_doc_comment(doc_comment)) if doc_comment else (assign,)
def function_stub(fn_name: str, fn_def, types_to_import: Set[str]) -> ast.FunctionDef:
body = []
def function_stub(
fn_name: str, fn_def: Any, element_path: List[str], types_to_import: Set[str]
) -> ast.FunctionDef:
body: List[ast.AST] = []
doc = inspect.getdoc(fn_def)
if doc is not None and not fn_name.startswith("__"):
if doc is not None:
body.append(build_doc_comment(doc))
return ast.FunctionDef(
fn_name,
arguments_stub(fn_name, fn_def, doc or "", types_to_import),
arguments_stub(fn_name, fn_def, doc or "", element_path, types_to_import),
body or [AST_ELLIPSIS],
decorator_list=[],
returns=returns_stub(doc, types_to_import) if doc else None,
returns=returns_stub(fn_name, doc, element_path, types_to_import)
if doc
else None,
lineno=0,
)
def arguments_stub(callable_name, callable_def, doc: str, types_to_import: Set[str]):
def arguments_stub(
callable_name: str,
callable_def: Any,
doc: str,
element_path: List[str],
types_to_import: Set[str],
) -> ast.arguments:
real_parameters: Mapping[str, inspect.Parameter] = inspect.signature(
callable_def
).parameters
@ -159,16 +218,29 @@ def arguments_stub(callable_name, callable_def, doc: str, types_to_import: Set[s
parsed_param_types = {}
optional_params = set()
for match in re.findall(r"\n *:type *([a-z_]+): ([^\n]*) *\n", doc):
# Types for magic functions types
builtin = BUILTINS.get(callable_name)
if isinstance(builtin, tuple):
param_names = list(real_parameters.keys())
if param_names and param_names[0] == "self":
del param_names[0]
for name, t in zip(param_names, builtin[0]):
parsed_param_types[name] = t
# Types from comment
for match in re.findall(r"^ *:type *([a-z_]+): ([^\n]*) *$", doc, re.MULTILINE):
if match[0] not in real_parameters:
raise ValueError(
f"The parameter {match[0]} is defined in the documentation but not in the function signature"
f"The parameter {match[0]} of {'.'.join(element_path)} is defined in the documentation but not in the function signature"
)
type = match[1]
if type.endswith(", optional"):
optional_params.add(match[0])
type = type[:-10]
parsed_param_types[match[0]] = convert_type_from_doc(type, types_to_import)
parsed_param_types[match[0]] = convert_type_from_doc(
type, element_path, types_to_import
)
# we parse the parameters
posonlyargs = []
@ -179,13 +251,9 @@ def arguments_stub(callable_name, callable_def, doc: str, types_to_import: Set[s
kwarg = None
defaults = []
for param in real_parameters.values():
if (
param.name != "self"
and param.name not in parsed_param_types
and (callable_name == "__init__" or not callable_name.startswith("__"))
):
if param.name != "self" and param.name not in parsed_param_types:
raise ValueError(
f"The parameter {param.name} of {callable_name} has no type definition in the function documentation"
f"The parameter {param.name} of {'.'.join(element_path)} has no type definition in the function documentation"
)
param_ast = ast.arg(
arg=param.name, annotation=parsed_param_types.get(param.name)
@ -196,11 +264,11 @@ def arguments_stub(callable_name, callable_def, doc: str, types_to_import: Set[s
default_ast = ast.Constant(param.default)
if param.name not in optional_params:
raise ValueError(
f"Parameter {param.name} is optional according to the type but not flagged as such in the doc"
f"Parameter {param.name} of {'.'.join(element_path)} is optional according to the type but not flagged as such in the doc"
)
elif param.name in optional_params:
raise ValueError(
f"Parameter {param.name} is optional according to the documentation but has no default value"
f"Parameter {param.name} of {'.'.join(element_path)} is optional according to the documentation but has no default value"
)
if param.kind == param.POSITIONAL_ONLY:
@ -228,22 +296,35 @@ def arguments_stub(callable_name, callable_def, doc: str, types_to_import: Set[s
)
def returns_stub(doc: str, types_to_import: Set[str]):
m = re.findall(r"\n *:rtype: *([^\n]*) *\n", doc)
def returns_stub(
callable_name: str, doc: str, element_path: List[str], types_to_import: Set[str]
) -> Optional[ast.AST]:
m = re.findall(r"^ *:rtype: *([^\n]*) *$", doc, re.MULTILINE)
if len(m) == 0:
return None
builtin = BUILTINS.get(callable_name)
if isinstance(builtin, tuple) and builtin[1] is not None:
return builtin[1]
raise ValueError(
f"The return type of {'.'.join(element_path)} has no type definition using :rtype: in the function documentation"
)
elif len(m) == 1:
return convert_type_from_doc(m[0], types_to_import)
return convert_type_from_doc(m[0], element_path, types_to_import)
else:
raise ValueError("Multiple return type annotations found with :rtype:")
raise ValueError(
f"Multiple return type annotations found with :rtype: for {'.'.join(element_path)}"
)
def convert_type_from_doc(type_str: str, types_to_import: Set[str]):
def convert_type_from_doc(
type_str: str, element_path: List[str], types_to_import: Set[str]
) -> ast.AST:
type_str = type_str.strip()
return parse_type_to_ast(type_str, types_to_import)
return parse_type_to_ast(type_str, element_path, types_to_import)
def parse_type_to_ast(type_str: str, types_to_import: Set[str]):
def parse_type_to_ast(
type_str: str, element_path: List[str], types_to_import: Set[str]
) -> ast.AST:
# let's tokenize
tokens = []
current_token = ""
@ -272,32 +353,30 @@ def parse_type_to_ast(type_str: str, types_to_import: Set[str]):
stack[-1].append(token)
# then it's easy
def parse_sequence(sequence):
def parse_sequence(sequence: List[Any]) -> ast.AST:
# we split based on "or"
or_groups = [[]]
or_groups: List[List[str]] = [[]]
for e in sequence:
if e == "or":
or_groups.append([])
else:
or_groups[-1].append(e)
if any(not g for g in or_groups):
raise ValueError(f'Not able to parse type "{type_str}"')
raise ValueError(
f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}"
)
new_elements = []
new_elements: List[ast.AST] = []
for group in or_groups:
if len(group) == 1 and isinstance(group[0], str):
parts = group[0].split(".")
if any(not p for p in parts):
raise ValueError(f'Not able to parse type "{type_str}"')
raise ValueError(
f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}"
)
if len(parts) > 1:
types_to_import.add(parts[0])
new_elements.append(
reduce(
lambda acc, n: ast.Attribute(value=acc, attr=n, ctx=AST_LOAD),
parts[1:],
ast.Name(id=parts[0], ctx=AST_LOAD),
)
)
new_elements.append(_path_to_type(*parts))
elif (
len(group) == 2
and isinstance(group[0], str)
@ -305,7 +384,7 @@ def parse_type_to_ast(type_str: str, types_to_import: Set[str]):
):
if group[0] not in GENERICS:
raise ValueError(
f'Constructor {group[0]} is not supported in type "{type_str}"'
f"Constructor {group[0]} is not supported in type '{type_str}' used by {'.'.join(element_path)}"
)
new_elements.append(
ast.Subscript(
@ -315,14 +394,12 @@ def parse_type_to_ast(type_str: str, types_to_import: Set[str]):
)
)
else:
raise ValueError(f'Not able to parse type "{type_str}"')
raise ValueError(
f"Not able to parse type '{type_str}' used by {'.'.join(element_path)}"
)
return (
ast.Subscript(
value=ast.Attribute(
value=ast.Name(id="typing", ctx=AST_LOAD),
attr="Union",
ctx=AST_LOAD,
),
value=_path_to_type("typing", "Union"),
slice=ast.Tuple(elts=new_elements, ctx=AST_LOAD),
ctx=AST_LOAD,
)
@ -333,7 +410,7 @@ def parse_type_to_ast(type_str: str, types_to_import: Set[str]):
return parse_sequence(stack[0])
def build_doc_comment(doc: str):
def build_doc_comment(doc: str) -> ast.Expr:
lines = [l.strip() for l in doc.split("\n")]
clean_lines = []
for l in lines:
@ -371,6 +448,9 @@ if __name__ == "__main__":
)
args = parser.parse_args()
stub_content = ast.unparse(module_stubs(importlib.import_module(args.module_name)))
stub_content = stub_content.replace(
", /", ""
) # TODO: remove when targeting Python 3.8+
if args.black:
stub_content = format_with_black(stub_content)
args.out.write(stub_content)

@ -38,7 +38,7 @@ pub fn add_to_module(module: &PyModule) -> PyResult<()> {
/// :param base_iri: the base IRI used to resolve the relative IRIs in the file or :py:const:`None` if relative IRI resolution should not be done.
/// :type base_iri: str or None, optional
/// :return: an iterator of RDF triples or quads depending on the format.
/// :rtype: iter(Triple) or iter(Quad)
/// :rtype: iterator(Triple) or iterator(Quad)
/// :raises ValueError: if the MIME type is not supported.
/// :raises SyntaxError: if the provided data is invalid.
///
@ -104,11 +104,12 @@ pub fn parse(
/// and ``application/xml`` for `RDF/XML <https://www.w3.org/TR/rdf-syntax-grammar/>`_.
///
/// :param input: the RDF triples and quads to serialize.
/// :type input: iter(Triple) or iter(Quad)
/// :type input: iterable(Triple) or iterable(Quad)
/// :param output: The binary I/O object or file path to write to. For example, it could be a file path as a string or a file writer opened in binary mode with ``open('my_file.ttl', 'wb')``.
/// :type output: io.RawIOBase or io.BufferedIOBase or str
/// :param mime_type: the MIME type of the RDF serialization.
/// :type mime_type: str
/// :rtype: None
/// :raises ValueError: if the MIME type is not supported.
/// :raises TypeError: if a triple is given during a quad format serialization or reverse.
///

@ -371,6 +371,8 @@ impl PyDefaultGraph {
Self {}
}
/// :return: the empty string.
/// :rtype: str
#[getter]
fn value(&self) -> &str {
""

@ -59,6 +59,7 @@ impl PyStore {
///
/// :param quad: the quad to add.
/// :type quad: Quad
/// :rtype: None
/// :raises IOError: if an I/O error happens during the quad insertion.
///
/// >>> store = Store()
@ -77,6 +78,7 @@ impl PyStore {
///
/// :param quad: the quad to remove.
/// :type quad: Quad
/// :rtype: None
/// :raises IOError: if an I/O error happens during the quad removal.
///
/// >>> store = Store()
@ -104,7 +106,7 @@ impl PyStore {
/// :param graph_name: the quad graph name. To match only the default graph, use :py:class:`DefaultGraph`. To match everything use :py:const:`None`.
/// :type graph_name: NamedNode or BlankNode or DefaultGraph or None, optional
/// :return: an iterator of the quads matching the pattern.
/// :rtype: iter(Quad)
/// :rtype: iterator(Quad)
/// :raises IOError: if an I/O error happens during the quads lookup.
///
/// >>> store = Store()
@ -208,6 +210,7 @@ impl PyStore {
/// :type update: str
/// :param base_iri: the base IRI used to resolve the relative IRIs in the SPARQL update or :py:const:`None` if relative IRI resolution should not be done.
/// :type base_iri: str or None, optional
/// :rtype: None
/// :raises SyntaxError: if the provided update is invalid.
/// :raises IOError: if an I/O error happens while reading the store.
///
@ -270,6 +273,7 @@ impl PyStore {
/// :type base_iri: str or None, optional
/// :param to_graph: if it is a file composed of triples, the graph in which the triples should be stored. By default, the default graph is used.
/// :type to_graph: NamedNode or BlankNode or DefaultGraph or None, optional
/// :rtype: None
/// :raises ValueError: if the MIME type is not supported or the `to_graph` parameter is given with a quad file.
/// :raises SyntaxError: if the provided data is invalid.
/// :raises IOError: if an I/O error happens during a quad insertion.
@ -354,6 +358,7 @@ impl PyStore {
/// :type base_iri: str or None, optional
/// :param to_graph: if it is a file composed of triples, the graph in which the triples should be stored. By default, the default graph is used.
/// :type to_graph: NamedNode or BlankNode or DefaultGraph or None, optional
/// :rtype: None
/// :raises ValueError: if the MIME type is not supported or the `to_graph` parameter is given with a quad file.
/// :raises SyntaxError: if the provided data is invalid.
/// :raises IOError: if an I/O error happens during a quad insertion.
@ -433,6 +438,7 @@ impl PyStore {
/// :type mime_type: str
/// :param from_graph: if a triple based format is requested, the store graph from which dump the triples. By default, the default graph is used.
/// :type from_graph: NamedNode or BlankNode or DefaultGraph or None, optional
/// :rtype: None
/// :raises ValueError: if the MIME type is not supported or the `from_graph` parameter is given with a quad syntax.
/// :raises IOError: if an I/O error happens during a quad lookup
///
@ -492,7 +498,7 @@ impl PyStore {
/// Returns an iterator over all the store named graphs.
///
/// :return: an iterator of the store graph names.
/// :rtype: iter(NamedNode or BlankNode)
/// :rtype: iterator(NamedNode or BlankNode)
/// :raises IOError: if an I/O error happens during the named graphs lookup.
///
/// >>> store = Store()
@ -510,6 +516,7 @@ impl PyStore {
///
/// :param graph_name: the name of the name graph to add.
/// :type graph_name: NamedNode or BlankNode
/// :rtype: None
/// :raises IOError: if an I/O error happens during the named graph insertion.
///
/// >>> store = Store()
@ -537,6 +544,7 @@ impl PyStore {
///
/// :param graph_name: the name of the name graph to clear.
/// :type graph_name: NamedNode or BlankNode or DefaultGraph
/// :rtype: None
/// :raises IOError: if an I/O error happens during the operation.
///
/// >>> store = Store()
@ -562,6 +570,7 @@ impl PyStore {
///
/// :param graph_name: the name of the name graph to remove.
/// :type graph_name: NamedNode or BlankNode or DefaultGraph
/// :rtype: None
/// :raises IOError: if an I/O error happens during the named graph removal.
///
/// >>> store = Store()
@ -588,6 +597,7 @@ impl PyStore {
/// Clears the store by removing all its contents.
///
/// :rtype: None
/// :raises IOError: if an I/O error happens during the operation.
///
/// >>> store = Store()
@ -606,6 +616,7 @@ impl PyStore {
///
/// Flushes are automatically done using background threads but might lag a little bit.
///
/// :rtype: None
/// :raises IOError: if an I/O error happens during the flush.
#[pyo3(text_signature = "($self)")]
fn flush(&self, py: Python<'_>) -> PyResult<()> {
@ -616,6 +627,7 @@ impl PyStore {
///
/// Useful to call after a batch upload or another similar operation.
///
/// :rtype: None
/// :raises IOError: if an I/O error happens during the optimization.
#[pyo3(text_signature = "($self)")]
fn optimize(&self, py: Python<'_>) -> PyResult<()> {
@ -641,6 +653,7 @@ impl PyStore {
///
/// :param target_directory: the directory name to save the database to.
/// :type target_directory: str
/// :rtype: None
/// :raises IOError: if an I/O error happens during the backup.
#[pyo3(text_signature = "($self, target_directory)")]
fn backup(&self, target_directory: &str, py: Python<'_>) -> PyResult<()> {

@ -11,7 +11,7 @@ EXAMPLE_TRIPLE = Triple(
class TestParse(unittest.TestCase):
def test_parse_file(self):
def test_parse_file(self) -> None:
with NamedTemporaryFile() as fp:
fp.write(b'<foo> <p> "1" .')
fp.flush()
@ -20,11 +20,11 @@ class TestParse(unittest.TestCase):
[EXAMPLE_TRIPLE],
)
def test_parse_not_existing_file(self):
def test_parse_not_existing_file(self) -> None:
with self.assertRaises(IOError) as _:
parse("/tmp/not-existing-oxigraph-file.ttl", "text/turtle")
def test_parse_str_io(self):
def test_parse_str_io(self) -> None:
self.assertEqual(
list(
parse(
@ -36,7 +36,7 @@ class TestParse(unittest.TestCase):
[EXAMPLE_TRIPLE],
)
def test_parse_bytes_io(self):
def test_parse_bytes_io(self) -> None:
self.assertEqual(
list(
parse(
@ -48,7 +48,7 @@ class TestParse(unittest.TestCase):
[EXAMPLE_TRIPLE],
)
def test_parse_io_error(self):
def test_parse_io_error(self) -> None:
class BadIO(RawIOBase):
pass
@ -57,7 +57,7 @@ class TestParse(unittest.TestCase):
class TestSerialize(unittest.TestCase):
def test_serialize_to_bytes_io(self):
def test_serialize_to_bytes_io(self) -> None:
output = BytesIO()
serialize([EXAMPLE_TRIPLE], output, "text/turtle")
self.assertEqual(
@ -65,7 +65,7 @@ class TestSerialize(unittest.TestCase):
b'<http://example.com/foo> <http://example.com/p> "1" .\n',
)
def test_serialize_to_file(self):
def test_serialize_to_file(self) -> None:
with NamedTemporaryFile() as fp:
serialize([EXAMPLE_TRIPLE], fp.name, "text/turtle")
self.assertEqual(

@ -7,26 +7,26 @@ RDF_LANG_STRING = NamedNode("http://www.w3.org/1999/02/22-rdf-syntax-ns#langStri
class TestNamedNode(unittest.TestCase):
def test_constructor(self):
def test_constructor(self) -> None:
self.assertEqual(NamedNode("http://foo").value, "http://foo")
def test_string(self):
def test_string(self) -> None:
self.assertEqual(str(NamedNode("http://foo")), "<http://foo>")
def test_equal(self):
def test_equal(self) -> None:
self.assertEqual(NamedNode("http://foo"), NamedNode("http://foo"))
self.assertNotEqual(NamedNode("http://foo"), NamedNode("http://bar"))
class TestBlankNode(unittest.TestCase):
def test_constructor(self):
def test_constructor(self) -> None:
self.assertEqual(BlankNode("foo").value, "foo")
self.assertNotEqual(BlankNode(), BlankNode())
def test_string(self):
def test_string(self) -> None:
self.assertEqual(str(BlankNode("foo")), "_:foo")
def test_equal(self):
def test_equal(self) -> None:
self.assertEqual(BlankNode("foo"), BlankNode("foo"))
self.assertNotEqual(BlankNode("foo"), BlankNode("bar"))
self.assertNotEqual(BlankNode("foo"), NamedNode("http://foo"))
@ -34,7 +34,7 @@ class TestBlankNode(unittest.TestCase):
class TestLiteral(unittest.TestCase):
def test_constructor(self):
def test_constructor(self) -> None:
self.assertEqual(Literal("foo").value, "foo")
self.assertEqual(Literal("foo").datatype, XSD_STRING)
@ -45,7 +45,7 @@ class TestLiteral(unittest.TestCase):
self.assertEqual(Literal("foo", datatype=XSD_INTEGER).value, "foo")
self.assertEqual(Literal("foo", datatype=XSD_INTEGER).datatype, XSD_INTEGER)
def test_string(self):
def test_string(self) -> None:
self.assertEqual(str(Literal("foo")), '"foo"')
self.assertEqual(str(Literal("foo", language="en")), '"foo"@en')
self.assertEqual(
@ -53,7 +53,7 @@ class TestLiteral(unittest.TestCase):
'"foo"^^<http://www.w3.org/2001/XMLSchema#integer>',
)
def test_equals(self):
def test_equals(self) -> None:
self.assertEqual(Literal("foo", datatype=XSD_STRING), Literal("foo"))
self.assertEqual(
Literal("foo", language="en", datatype=RDF_LANG_STRING),
@ -66,7 +66,7 @@ class TestLiteral(unittest.TestCase):
class TestTriple(unittest.TestCase):
def test_constructor(self):
def test_constructor(self) -> None:
t = Triple(
NamedNode("http://example.com/s"),
NamedNode("http://example.com/p"),
@ -76,7 +76,7 @@ class TestTriple(unittest.TestCase):
self.assertEqual(t.predicate, NamedNode("http://example.com/p"))
self.assertEqual(t.object, NamedNode("http://example.com/o"))
def test_rdf_star_constructor(self):
def test_rdf_star_constructor(self) -> None:
t = Triple(
Triple(
NamedNode("http://example.com/ss"),
@ -108,7 +108,7 @@ class TestTriple(unittest.TestCase):
),
)
def test_mapping(self):
def test_mapping(self) -> None:
t = Triple(
NamedNode("http://example.com/s"),
NamedNode("http://example.com/p"),
@ -118,7 +118,7 @@ class TestTriple(unittest.TestCase):
self.assertEqual(t[1], NamedNode("http://example.com/p"))
self.assertEqual(t[2], NamedNode("http://example.com/o"))
def test_destruct(self):
def test_destruct(self) -> None:
(s, p, o) = Triple(
NamedNode("http://example.com/s"),
NamedNode("http://example.com/p"),
@ -128,7 +128,7 @@ class TestTriple(unittest.TestCase):
self.assertEqual(p, NamedNode("http://example.com/p"))
self.assertEqual(o, NamedNode("http://example.com/o"))
def test_string(self):
def test_string(self) -> None:
self.assertEqual(
str(
Triple(
@ -142,7 +142,7 @@ class TestTriple(unittest.TestCase):
class TestQuad(unittest.TestCase):
def test_constructor(self):
def test_constructor(self) -> None:
t = Quad(
NamedNode("http://example.com/s"),
NamedNode("http://example.com/p"),
@ -175,7 +175,7 @@ class TestQuad(unittest.TestCase):
),
)
def test_mapping(self):
def test_mapping(self) -> None:
t = Quad(
NamedNode("http://example.com/s"),
NamedNode("http://example.com/p"),
@ -187,7 +187,7 @@ class TestQuad(unittest.TestCase):
self.assertEqual(t[2], NamedNode("http://example.com/o"))
self.assertEqual(t[3], NamedNode("http://example.com/g"))
def test_destruct(self):
def test_destruct(self) -> None:
(s, p, o, g) = Quad(
NamedNode("http://example.com/s"),
NamedNode("http://example.com/p"),
@ -199,7 +199,7 @@ class TestQuad(unittest.TestCase):
self.assertEqual(o, NamedNode("http://example.com/o"))
self.assertEqual(g, NamedNode("http://example.com/g"))
def test_string(self):
def test_string(self) -> None:
self.assertEqual(
str(
Triple(
@ -213,13 +213,13 @@ class TestQuad(unittest.TestCase):
class TestVariable(unittest.TestCase):
def test_constructor(self):
def test_constructor(self) -> None:
self.assertEqual(Variable("foo").value, "foo")
def test_string(self):
def test_string(self) -> None:
self.assertEqual(str(Variable("foo")), "?foo")
def test_equal(self):
def test_equal(self) -> None:
self.assertEqual(Variable("foo"), Variable("foo"))
self.assertNotEqual(Variable("foo"), Variable("bar"))

@ -1,6 +1,7 @@
import os
import unittest
from io import BytesIO, RawIOBase
from typing import Any
from pyoxigraph import *
from tempfile import NamedTemporaryFile
@ -13,7 +14,7 @@ graph = NamedNode("http://graph")
class TestStore(unittest.TestCase):
def test_add(self):
def test_add(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz))
store.add(Quad(foo, bar, baz, DefaultGraph()))
@ -22,7 +23,7 @@ class TestStore(unittest.TestCase):
store.add(Quad(foo, bar, triple))
self.assertEqual(len(store), 4)
def test_remove(self):
def test_remove(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz))
store.add(Quad(foo, bar, baz, DefaultGraph()))
@ -30,13 +31,13 @@ class TestStore(unittest.TestCase):
store.remove(Quad(foo, bar, baz))
self.assertEqual(len(store), 1)
def test_len(self):
def test_len(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz))
store.add(Quad(foo, bar, baz, graph))
self.assertEqual(len(store), 2)
def test_in(self):
def test_in(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz))
store.add(Quad(foo, bar, baz, DefaultGraph()))
@ -46,7 +47,7 @@ class TestStore(unittest.TestCase):
self.assertIn(Quad(foo, bar, baz, graph), store)
self.assertNotIn(Quad(foo, bar, baz, foo), store)
def test_iter(self):
def test_iter(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz, DefaultGraph()))
store.add(Quad(foo, bar, baz, graph))
@ -55,7 +56,7 @@ class TestStore(unittest.TestCase):
{Quad(foo, bar, baz, DefaultGraph()), Quad(foo, bar, baz, graph)},
)
def test_quads_for_pattern(self):
def test_quads_for_pattern(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz, DefaultGraph()))
store.add(Quad(foo, bar, baz, graph))
@ -76,26 +77,26 @@ class TestStore(unittest.TestCase):
{Quad(foo, bar, baz, DefaultGraph())},
)
def test_ask_query(self):
def test_ask_query(self) -> None:
store = Store()
store.add(Quad(foo, foo, foo))
self.assertTrue(store.query("ASK { ?s ?s ?s }"))
self.assertFalse(store.query("ASK { FILTER(false) }"))
def test_construct_query(self):
def test_construct_query(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz))
results = store.query("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }")
results: Any = store.query("CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o }")
self.assertIsInstance(results, QueryTriples)
self.assertEqual(
set(results),
{Triple(foo, bar, baz)},
)
def test_select_query(self):
def test_select_query(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz))
solutions = store.query("SELECT ?s ?o WHERE { ?s ?p ?o }")
solutions: Any = store.query("SELECT ?s ?o WHERE { ?s ?p ?o }")
self.assertIsInstance(solutions, QuerySolutions)
self.assertEqual(solutions.variables, [Variable("s"), Variable("o")])
solution = next(solutions)
@ -110,10 +111,11 @@ class TestStore(unittest.TestCase):
self.assertEqual(s, foo)
self.assertEqual(o, baz)
def test_select_query_union_default_graph(self):
def test_select_query_union_default_graph(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz, graph))
self.assertEqual(len(list(store.query("SELECT ?s WHERE { ?s ?p ?o }"))), 0)
results: Any = store.query("SELECT ?s WHERE { ?s ?p ?o }")
self.assertEqual(len(list(results)), 0)
results = store.query(
"SELECT ?s WHERE { ?s ?p ?o }", use_default_graph_as_union=True
)
@ -125,13 +127,14 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(len(list(results)), 1)
def test_select_query_with_default_graph(self):
def test_select_query_with_default_graph(self) -> None:
store = Store()
graph_bnode = BlankNode("g")
store.add(Quad(foo, bar, baz, graph))
store.add(Quad(foo, bar, foo))
store.add(Quad(foo, bar, bar, graph_bnode))
self.assertEqual(len(list(store.query("SELECT ?s WHERE { ?s ?p ?o }"))), 1)
results: Any = store.query("SELECT ?s WHERE { ?s ?p ?o }")
self.assertEqual(len(list(results)), 1)
results = store.query("SELECT ?s WHERE { ?s ?p ?o }", default_graph=graph)
self.assertEqual(len(list(results)), 1)
results = store.query(
@ -140,52 +143,52 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(len(list(results)), 3)
def test_select_query_with_named_graph(self):
def test_select_query_with_named_graph(self) -> None:
store = Store()
graph_bnode = BlankNode("g")
store.add(Quad(foo, bar, baz, graph))
store.add(Quad(foo, bar, foo))
store.add(Quad(foo, bar, bar, graph_bnode))
store.add(Quad(foo, bar, bar, foo))
results = store.query(
results: Any = store.query(
"SELECT ?s WHERE { GRAPH ?g { ?s ?p ?o } }",
named_graphs=[graph, graph_bnode],
)
self.assertEqual(len(list(results)), 2)
def test_update_insert_data(self):
def test_update_insert_data(self) -> None:
store = Store()
store.update("INSERT DATA { <http://foo> <http://foo> <http://foo> }")
self.assertEqual(len(store), 1)
def test_update_delete_data(self):
def test_update_delete_data(self) -> None:
store = Store()
store.add(Quad(foo, foo, foo))
store.update("DELETE DATA { <http://foo> <http://foo> <http://foo> }")
self.assertEqual(len(store), 0)
def test_update_delete_where(self):
def test_update_delete_where(self) -> None:
store = Store()
store.add(Quad(foo, foo, foo))
store.update("DELETE WHERE { ?v ?v ?v }")
self.assertEqual(len(store), 0)
def test_update_load(self):
def test_update_load(self) -> None:
store = Store()
store.update("LOAD <https://www.w3.org/1999/02/22-rdf-syntax-ns>")
self.assertGreater(len(store), 100)
def test_update_star(self):
def test_update_star(self) -> None:
store = Store()
store.update(
"PREFIX : <http://www.example.org/> INSERT DATA { :alice :claims << :bob :age 23 >> }"
)
results = store.query(
results: Any = store.query(
"PREFIX : <http://www.example.org/> SELECT ?p ?a WHERE { ?p :claims << :bob :age ?a >> }"
)
self.assertEqual(len(list(results)), 1)
def test_load_ntriples_to_default_graph(self):
def test_load_ntriples_to_default_graph(self) -> None:
store = Store()
store.load(
BytesIO(b"<http://foo> <http://bar> <http://baz> ."),
@ -193,7 +196,7 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(set(store), {Quad(foo, bar, baz, DefaultGraph())})
def test_load_ntriples_to_named_graph(self):
def test_load_ntriples_to_named_graph(self) -> None:
store = Store()
store.load(
BytesIO(b"<http://foo> <http://bar> <http://baz> ."),
@ -202,7 +205,7 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(set(store), {Quad(foo, bar, baz, graph)})
def test_load_turtle_with_base_iri(self):
def test_load_turtle_with_base_iri(self) -> None:
store = Store()
store.load(
BytesIO(b"<http://foo> <http://bar> <> ."),
@ -211,7 +214,7 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(set(store), {Quad(foo, bar, baz, DefaultGraph())})
def test_load_nquads(self):
def test_load_nquads(self) -> None:
store = Store()
store.load(
BytesIO(b"<http://foo> <http://bar> <http://baz> <http://graph>."),
@ -219,7 +222,7 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(set(store), {Quad(foo, bar, baz, graph)})
def test_load_trig_with_base_iri(self):
def test_load_trig_with_base_iri(self) -> None:
store = Store()
store.load(
BytesIO(b"<http://graph> { <http://foo> <http://bar> <> . }"),
@ -228,7 +231,7 @@ class TestStore(unittest.TestCase):
)
self.assertEqual(set(store), {Quad(foo, bar, baz, graph)})
def test_load_file(self):
def test_load_file(self) -> None:
with NamedTemporaryFile(delete=False) as fp:
file_name = fp.name
fp.write(b"<http://foo> <http://bar> <http://baz> <http://graph>.")
@ -237,14 +240,14 @@ class TestStore(unittest.TestCase):
os.remove(file_name)
self.assertEqual(set(store), {Quad(foo, bar, baz, graph)})
def test_load_with_io_error(self):
def test_load_with_io_error(self) -> None:
class BadIO(RawIOBase):
pass
with self.assertRaises(NotImplementedError) as _:
Store().load(BadIO(), mime_type="application/n-triples")
def test_dump_ntriples(self):
def test_dump_ntriples(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz, graph))
output = BytesIO()
@ -254,7 +257,7 @@ class TestStore(unittest.TestCase):
b"<http://foo> <http://bar> <http://baz> .\n",
)
def test_dump_nquads(self):
def test_dump_nquads(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz, graph))
output = BytesIO()
@ -264,7 +267,7 @@ class TestStore(unittest.TestCase):
b"<http://foo> <http://bar> <http://baz> <http://graph> .\n",
)
def test_dump_file(self):
def test_dump_file(self) -> None:
with NamedTemporaryFile(delete=False) as fp:
file_name = fp.name
store = Store()
@ -277,14 +280,14 @@ class TestStore(unittest.TestCase):
"<http://foo> <http://bar> <http://baz> <http://graph> .\n",
)
def test_dump_with_io_error(self):
def test_dump_with_io_error(self) -> None:
class BadIO(RawIOBase):
pass
with self.assertRaises(OSError) as _:
Store().dump(BadIO(), mime_type="application/rdf+xml")
def test_write_in_read(self):
def test_write_in_read(self) -> None:
store = Store()
store.add(Quad(foo, bar, bar))
store.add(Quad(foo, bar, baz))
@ -292,12 +295,12 @@ class TestStore(unittest.TestCase):
store.add(Quad(triple.object, triple.predicate, triple.subject))
self.assertEqual(len(store), 4)
def test_add_graph(self):
def test_add_graph(self) -> None:
store = Store()
store.add_graph(graph)
self.assertEqual(list(store.named_graphs()), [graph])
def test_remove_graph(self):
def test_remove_graph(self) -> None:
store = Store()
store.add(Quad(foo, bar, baz, graph))
store.add_graph(NamedNode("http://graph2"))

Loading…
Cancel
Save