WIP: Transactions

pull/171/head
Tpt 3 years ago
parent 9f414c13fd
commit 607aa0b0dd
  1. 25
      lib/src/sparql/dataset.rs
  2. 5
      lib/src/sparql/mod.rs
  3. 76
      lib/src/sparql/update.rs
  4. 4
      lib/src/storage/backend/mod.rs
  5. 402
      lib/src/storage/backend/rocksdb.rs
  6. 7
      lib/src/storage/binary_encoder.rs
  7. 458
      lib/src/storage/mod.rs
  8. 31
      lib/src/storage/numeric_encoder.rs
  9. 59
      lib/src/store.rs
  10. 19
      lib/tests/store.rs
  11. 71
      rocksdb-sys/api/c.cc
  12. 29
      rocksdb-sys/api/c.h

@ -2,20 +2,21 @@ use crate::model::TermRef;
use crate::sparql::algebra::QueryDataset;
use crate::sparql::EvaluationError;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use crate::storage::Storage;
use crate::storage::StorageReader;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::Infallible;
use std::iter::empty;
pub struct DatasetView {
storage: Storage,
reader: StorageReader,
extra: RefCell<HashMap<StrHash, String>>,
dataset: EncodedDatasetSpec,
}
impl DatasetView {
pub fn new(storage: Storage, dataset: &QueryDataset) -> Self {
pub fn new(reader: StorageReader, dataset: &QueryDataset) -> Self {
let dataset = EncodedDatasetSpec {
default: dataset
.default_graph_graphs()
@ -25,7 +26,7 @@ impl DatasetView {
.map(|graphs| graphs.iter().map(|g| g.as_ref().into()).collect::<Vec<_>>()),
};
Self {
storage,
reader,
extra: RefCell::new(HashMap::default()),
dataset,
}
@ -38,7 +39,7 @@ impl DatasetView {
object: Option<&EncodedTerm>,
graph_name: Option<&EncodedTerm>,
) -> impl Iterator<Item = Result<EncodedQuad, EvaluationError>> + 'static {
self.storage
self.reader
.quads_for_pattern(subject, predicate, object, graph_name)
.map(|t| t.map_err(|e| e.into()))
}
@ -140,15 +141,17 @@ impl DatasetView {
pub fn encode_term<'a>(&self, term: impl Into<TermRef<'a>>) -> EncodedTerm {
let term = term.into();
let encoded = term.into();
insert_term(term, &encoded, &mut |key, value| {
self.insert_str(key, value)
});
insert_term::<Infallible, _>(term, &encoded, &mut |key, value| {
self.insert_str(key, value);
Ok(())
})
.unwrap();
encoded
}
pub fn insert_str(&self, key: &StrHash, value: &str) {
if let Entry::Vacant(e) = self.extra.borrow_mut().entry(*key) {
if !matches!(self.storage.contains_str(key), Ok(true)) {
if !matches!(self.reader.contains_str(key), Ok(true)) {
e.insert(value.to_owned());
}
}
@ -162,12 +165,12 @@ impl StrLookup for DatasetView {
Ok(if let Some(value) = self.extra.borrow().get(key) {
Some(value.clone())
} else {
self.storage.get_str(key)?
self.reader.get_str(key)?
})
}
fn contains_str(&self, key: &StrHash) -> Result<bool, EvaluationError> {
Ok(self.extra.borrow().contains_key(key) || self.storage.contains_str(key)?)
Ok(self.extra.borrow().contains_key(key) || self.reader.contains_str(key)?)
}
}

@ -42,7 +42,7 @@ pub(crate) fn evaluate_query(
options: QueryOptions,
) -> Result<QueryResults, EvaluationError> {
let query = query.try_into().map_err(std::convert::Into::into)?;
let dataset = DatasetView::new(storage, &query.dataset);
let dataset = DatasetView::new(storage.snapshot(), &query.dataset);
match query.inner {
spargebra::Query::Select {
pattern, base_iri, ..
@ -181,6 +181,5 @@ pub(crate) fn evaluate_update(
update: Update,
options: UpdateOptions,
) -> Result<(), EvaluationError> {
SimpleUpdateEvaluator::new(storage, update.inner.base_iri.map(Rc::new), options)
.eval_all(&update.inner.operations, &update.using_datasets)
SimpleUpdateEvaluator::run(storage, update, options)
}

@ -9,7 +9,7 @@ use crate::sparql::eval::SimpleEvaluator;
use crate::sparql::http::Client;
use crate::sparql::plan::EncodedTuple;
use crate::sparql::plan_builder::PlanBuilder;
use crate::sparql::{EvaluationError, UpdateOptions};
use crate::sparql::{EvaluationError, Update, UpdateOptions};
use crate::storage::io::load_graph;
use crate::storage::numeric_encoder::{Decoder, EncodedTerm};
use crate::storage::{Storage, StorageWriter};
@ -27,39 +27,45 @@ use std::io;
use std::io::BufReader;
use std::rc::Rc;
pub struct SimpleUpdateEvaluator<'a> {
storage: &'a Storage,
writer: StorageWriter,
pub struct SimpleUpdateEvaluator {
transaction: StorageWriter,
base_iri: Option<Rc<Iri<String>>>,
options: UpdateOptions,
client: Client,
}
impl<'a> SimpleUpdateEvaluator<'a> {
pub fn new(
storage: &'a Storage,
base_iri: Option<Rc<Iri<String>>>,
impl SimpleUpdateEvaluator {
pub fn run(
storage: &Storage,
update: Update,
options: UpdateOptions,
) -> Self {
) -> Result<(), EvaluationError> {
let client = Client::new(options.query_options.http_timeout);
let writer = storage.atomic_writer();
Self {
storage,
writer,
base_iri,
let mut evaluator = Self {
transaction: storage.transaction(),
base_iri: update.inner.base_iri.map(Rc::new),
options,
client,
};
match evaluator.eval_all(&update.inner.operations, &update.using_datasets) {
Ok(_) => {
evaluator.transaction.commit()?;
Ok(())
}
Err(e) => {
evaluator.transaction.rollback()?;
Err(e)
}
}
}
pub fn eval_all(
fn eval_all(
&mut self,
updates: &[GraphUpdateOperation],
using_datasets: &[Option<QueryDataset>],
) -> Result<(), EvaluationError> {
for (update, using_dataset) in updates.iter().zip(using_datasets) {
self.eval(update, using_dataset)?;
self.writer.commit()?;
}
Ok(())
}
@ -103,7 +109,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
let mut bnodes = HashMap::new();
for quad in data {
let quad = Self::convert_quad(quad, &mut bnodes);
self.writer.insert(quad.as_ref())?;
self.transaction.insert(quad.as_ref())?;
}
Ok(())
}
@ -111,7 +117,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
fn eval_delete_data(&mut self, data: &[GroundQuad]) -> Result<(), EvaluationError> {
for quad in data {
let quad = Self::convert_ground_quad(quad);
self.writer.remove(quad.as_ref())?;
self.transaction.remove(quad.as_ref())?;
}
Ok(())
}
@ -123,7 +129,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
using: &QueryDataset,
algebra: &GraphPattern,
) -> Result<(), EvaluationError> {
let dataset = Rc::new(DatasetView::new(self.storage.clone(), using));
let dataset = Rc::new(DatasetView::new(self.transaction.reader(), using));
let (plan, variables) = PlanBuilder::build(dataset.as_ref(), algebra, false)?;
let evaluator = SimpleEvaluator::new(
dataset.clone(),
@ -137,7 +143,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
if let Some(quad) =
Self::convert_ground_quad_pattern(quad, &variables, &tuple, &dataset)?
{
self.writer.remove(quad.as_ref())?;
self.transaction.remove(quad.as_ref())?;
if !insert.is_empty() {
// Hack to make sure the triple terms are still available for an insert
dataset.encode_term(quad.subject.as_ref());
@ -150,7 +156,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
if let Some(quad) =
Self::convert_quad_pattern(quad, &variables, &tuple, &dataset, &mut bnodes)?
{
self.writer.insert(quad.as_ref())?;
self.transaction.insert(quad.as_ref())?;
}
}
bnodes.clear();
@ -174,7 +180,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
GraphName::DefaultGraph => GraphNameRef::DefaultGraph,
};
load_graph(
&mut self.writer,
&mut self.transaction,
BufReader::new(body),
format,
to_graph_name,
@ -186,7 +192,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
fn eval_create(&mut self, graph_name: &NamedNode, silent: bool) -> Result<(), EvaluationError> {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.writer.insert_named_graph(graph_name.into())? || silent {
if self.transaction.insert_named_graph(graph_name.into())? || silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
@ -200,8 +206,12 @@ impl<'a> SimpleUpdateEvaluator<'a> {
match graph {
GraphTarget::NamedNode(graph_name) => {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.contains_named_graph(&graph_name.into())? {
Ok(self.writer.clear_graph(graph_name.into())?)
if self
.transaction
.reader()
.contains_named_graph(&graph_name.into())?
{
Ok(self.transaction.clear_graph(graph_name.into())?)
} else if silent {
Ok(())
} else {
@ -212,11 +222,11 @@ impl<'a> SimpleUpdateEvaluator<'a> {
}
}
GraphTarget::DefaultGraph => {
self.writer.clear_graph(GraphNameRef::DefaultGraph)?;
self.transaction.clear_graph(GraphNameRef::DefaultGraph)?;
Ok(())
}
GraphTarget::NamedGraphs => Ok(self.writer.clear_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.writer.clear_all_graphs()?),
GraphTarget::NamedGraphs => Ok(self.transaction.clear_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.transaction.clear_all_graphs()?),
}
}
@ -224,7 +234,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
match graph {
GraphTarget::NamedNode(graph_name) => {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.writer.remove_named_graph(graph_name.into())? || silent {
if self.transaction.remove_named_graph(graph_name.into())? || silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
@ -233,9 +243,11 @@ impl<'a> SimpleUpdateEvaluator<'a> {
)))
}
}
GraphTarget::DefaultGraph => Ok(self.writer.clear_graph(GraphNameRef::DefaultGraph)?),
GraphTarget::NamedGraphs => Ok(self.writer.remove_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.writer.clear()?),
GraphTarget::DefaultGraph => {
Ok(self.transaction.clear_graph(GraphNameRef::DefaultGraph)?)
}
GraphTarget::NamedGraphs => Ok(self.transaction.remove_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.transaction.clear()?),
}
}

@ -7,8 +7,8 @@ pub use fallback::{
};
#[cfg(not(target_arch = "wasm32"))]
pub use rocksdb::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, SstFileWriter,
WriteBatchWithIndex,
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, Reader, SstFileWriter,
Transaction,
};
use std::ffi::CString;

@ -10,12 +10,14 @@ use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t};
use oxrocksdb_sys::*;
use rand::random;
use std::borrow::Borrow;
use std::cell::Cell;
use std::env::temp_dir;
use std::ffi::{CStr, CString};
use std::io::{Error, ErrorKind, Result};
use std::iter::Zip;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
use std::{ptr, slice};
@ -58,7 +60,8 @@ unsafe impl Sync for Db {}
struct DbHandler {
db: *mut rocksdb_transactiondb_t,
options: *mut rocksdb_options_t,
txn_options: *mut rocksdb_transactiondb_options_t,
transaction_options: *mut rocksdb_transaction_options_t,
transactiondb_options: *mut rocksdb_transactiondb_options_t,
read_options: *mut rocksdb_readoptions_t,
write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t,
@ -90,7 +93,8 @@ impl Drop for DbHandler {
rocksdb_envoptions_destroy(self.env_options);
rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options);
rocksdb_compactoptions_destroy(self.compaction_options);
rocksdb_transactiondb_options_destroy(self.txn_options);
rocksdb_transaction_options_destroy(self.transaction_options);
rocksdb_transactiondb_options_destroy(self.transactiondb_options);
rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options);
if let Some(env) = self.env {
@ -177,9 +181,9 @@ impl Db {
);
rocksdb_options_set_block_based_table_factory(options, block_based_table_options);
let txn_options = rocksdb_transactiondb_options_create();
let transactiondb_options = rocksdb_transactiondb_options_create();
assert!(
!txn_options.is_null(),
!transactiondb_options.is_null(),
"rocksdb_transactiondb_options_create returned null"
);
@ -187,7 +191,7 @@ impl Db {
let env = rocksdb_create_mem_env();
if env.is_null() {
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
rocksdb_transactiondb_options_destroy(transactiondb_options);
return Err(other_error("Not able to create an in-memory environment."));
}
rocksdb_options_set_env(options, env);
@ -263,7 +267,7 @@ impl Db {
vec![ptr::null_mut(); column_family_names.len()];
let db = ffi_result!(rocksdb_transactiondb_open_column_families(
options,
txn_options,
transactiondb_options,
c_path.as_ptr(),
c_column_families.len().try_into().unwrap(),
c_column_families
@ -323,10 +327,18 @@ impl Db {
"rocksdb_compactoptions_create returned null"
);
let transaction_options = rocksdb_transaction_options_create();
assert!(
!transaction_options.is_null(),
"rocksdb_transaction_options_create returned null"
);
rocksdb_transaction_options_set_set_snapshot(transaction_options, 1);
Ok(DbHandler {
db,
options,
txn_options,
transaction_options,
transactiondb_options,
read_options,
write_options,
flush_options,
@ -353,6 +365,65 @@ impl Db {
None
}
/// Unsafe reader (data might appear and disapear between two reads)
/// Use [`snapshot`] if you don't want that.
#[must_use]
pub fn reader(&self) -> Reader {
Reader {
inner: InnerReader::Raw(self.0.clone()),
options: unsafe { rocksdb_readoptions_create_copy(self.0.read_options) },
}
}
#[must_use]
pub fn snapshot(&self) -> Reader {
unsafe {
let snapshot = rocksdb_transactiondb_create_snapshot(self.0.db);
assert!(
!snapshot.is_null(),
"rocksdb_transactiondb_create_snapshot returned null"
);
let options = rocksdb_readoptions_create_copy(self.0.read_options);
rocksdb_readoptions_set_snapshot(options, snapshot);
Reader {
inner: InnerReader::Snapshot(Rc::new(InnerSnapshot {
db: self.0.clone(),
snapshot,
})),
options,
}
}
}
#[must_use]
pub fn transaction(&self) -> Transaction {
unsafe {
let transaction = rocksdb_transaction_begin(
self.0.db,
self.0.write_options,
self.0.transaction_options,
ptr::null_mut(),
);
assert!(
!transaction.is_null(),
"rocksdb_transaction_begin returned null"
);
let options = rocksdb_readoptions_create_copy(self.0.read_options);
rocksdb_readoptions_set_snapshot(
options,
rocksdb_transaction_get_snapshot(transaction),
);
Transaction {
inner: Rc::new(InnerTransaction {
transaction,
is_ended: Cell::new(false),
_db: self.0.clone(),
}),
read_options: options,
}
}
}
pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transactiondb_flush_cf(
@ -378,15 +449,108 @@ impl Db {
}
}
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> {
pub fn new_sst_file(&self) -> Result<SstFileWriter> {
unsafe {
let slice = ffi_result!(rocksdb_transactiondb_get_pinned_cf(
self.0.db,
self.0.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
let path = self.0.path.join(random::<u128>().to_string());
let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options);
ffi_result!(rocksdb_sstfilewriter_open(
writer,
path_to_cstring(&path)?.as_ptr()
))?;
Ok(SstFileWriter { writer, path })
}
}
pub fn write_stt_files(&self, ssts_for_cf: Vec<(&ColumnFamily, PathBuf)>) -> Result<()> {
for (cf, path) in ssts_for_cf {
unsafe {
ffi_result!(rocksdb_transactiondb_ingest_external_file_cf(
self.0.db,
cf.0,
&path_to_cstring(&path)?.as_ptr(),
1,
self.0.ingest_external_file_options
))?;
}
}
Ok(())
}
}
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.
// So, no use after free possible.
#[derive(Clone)]
pub struct ColumnFamily(*mut rocksdb_column_family_handle_t);
unsafe impl Send for ColumnFamily {}
unsafe impl Sync for ColumnFamily {}
pub struct Reader {
inner: InnerReader,
options: *mut rocksdb_readoptions_t,
}
#[derive(Clone)]
enum InnerReader {
Raw(Arc<DbHandler>),
Snapshot(Rc<InnerSnapshot>),
Transaction(Rc<InnerTransaction>),
}
struct InnerSnapshot {
db: Arc<DbHandler>,
snapshot: *const rocksdb_snapshot_t,
}
impl Drop for InnerSnapshot {
fn drop(&mut self) {
unsafe { rocksdb_transactiondb_release_snapshot(self.db.db, self.snapshot) }
}
}
impl Clone for Reader {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
options: unsafe { rocksdb_readoptions_create_copy(self.options) },
}
}
}
impl Drop for Reader {
fn drop(&mut self) {
unsafe { rocksdb_readoptions_destroy(self.options) }
}
}
impl Reader {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> {
unsafe {
let slice = match &self.inner {
InnerReader::Raw(inner) => ffi_result!(rocksdb_transactiondb_get_pinned_cf(
inner.db,
self.options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
)),
InnerReader::Snapshot(inner) => ffi_result!(rocksdb_transactiondb_get_pinned_cf(
inner.db.db,
self.options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
)),
InnerReader::Transaction(inner) => {
ffi_result!(rocksdb_transaction_get_pinned_cf(
inner.transaction,
self.options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
))
}
}?;
Ok(if slice.is_null() {
None
} else {
@ -399,33 +563,12 @@ impl Db {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn new_batch(&self) -> WriteBatchWithIndex {
unsafe {
let batch = rocksdb_writebatch_wi_create(0, 0);
assert!(!batch.is_null(), "rocksdb_writebatch_create returned null");
WriteBatchWithIndex {
batch,
db: self.clone(),
}
}
}
pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transactiondb_write_writebatch_wi(
self.0.db,
self.0.write_options,
batch.batch
))
}?;
batch.clear();
Ok(())
}
#[must_use]
pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(column_family, &[])
}
#[must_use]
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter {
//We generate the upper bound
let upper_bound = {
@ -446,7 +589,7 @@ impl Db {
};
unsafe {
let options = rocksdb_readoptions_create();
let options = rocksdb_readoptions_create_copy(self.options);
assert!(
!options.is_null(),
"rocksdb_readoptions_create returned null"
@ -458,8 +601,19 @@ impl Db {
upper_bound.len(),
);
}
let iter =
rocksdb_transactiondb_create_iterator_cf(self.0.db, options, column_family.0);
let iter = match &self.inner {
InnerReader::Raw(inner) => {
rocksdb_transactiondb_create_iterator_cf(inner.db, options, column_family.0)
}
InnerReader::Snapshot(inner) => {
rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0)
}
InnerReader::Transaction(inner) => rocksdb_transaction_create_iterator_cf(
inner.transaction,
options,
column_family.0,
),
};
assert!(!iter.is_null(), "rocksdb_create_iterator returned null");
if prefix.is_empty() {
rocksdb_iter_seek_to_first(iter);
@ -471,7 +625,7 @@ impl Db {
iter,
options,
_upper_bound: upper_bound,
_db: self.0.clone(),
_reader: self.clone(),
is_currently_valid,
}
}
@ -493,133 +647,121 @@ impl Db {
iter.status()?; // We makes sure there is no read problem
Ok(!iter.is_valid())
}
}
pub fn new_sst_file(&self) -> Result<SstFileWriter> {
unsafe {
let path = self.0.path.join(random::<u128>().to_string());
let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options);
ffi_result!(rocksdb_sstfilewriter_open(
writer,
path_to_cstring(&path)?.as_ptr()
))?;
Ok(SstFileWriter { writer, path })
}
pub struct Transaction {
inner: Rc<InnerTransaction>,
read_options: *mut rocksdb_readoptions_t,
}
struct InnerTransaction {
transaction: *mut rocksdb_transaction_t,
is_ended: Cell<bool>,
_db: Arc<DbHandler>, // Used to ensure that the transaction is not outliving the DB
}
impl Drop for Transaction {
fn drop(&mut self) {
unsafe { rocksdb_readoptions_destroy(self.read_options) }
}
}
pub fn write_stt_files(&self, ssts_for_cf: Vec<(&ColumnFamily, PathBuf)>) -> Result<()> {
for (cf, path) in ssts_for_cf {
unsafe {
ffi_result!(rocksdb_transactiondb_ingest_external_file_cf(
self.0.db,
cf.0,
&path_to_cstring(&path)?.as_ptr(),
1,
self.0.ingest_external_file_options
))?;
}
impl Drop for InnerTransaction {
fn drop(&mut self) {
if !self.is_ended.get() {
unsafe { ffi_result!(rocksdb_transaction_rollback(self.transaction)) }.unwrap();
}
Ok(())
unsafe { rocksdb_transaction_destroy(self.transaction) }
}
}
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.
// So, no use after free possible.
#[derive(Clone)]
pub struct ColumnFamily(*mut rocksdb_column_family_handle_t);
impl Transaction {
pub fn commit(self) -> Result<()> {
self.inner.is_ended.set(true);
unsafe { ffi_result!(rocksdb_transaction_commit(self.inner.transaction)) }
}
unsafe impl Send for ColumnFamily {}
unsafe impl Sync for ColumnFamily {}
pub fn rollback(self) -> Result<()> {
self.inner.is_ended.set(true);
unsafe { ffi_result!(rocksdb_transaction_rollback(self.inner.transaction)) }
}
pub struct WriteBatchWithIndex {
batch: *mut rocksdb_writebatch_wi_t,
db: Db,
}
pub fn reader(&self) -> Reader {
Reader {
inner: InnerReader::Transaction(self.inner.clone()),
options: unsafe { rocksdb_readoptions_create_copy(self.read_options) },
}
}
impl Drop for WriteBatchWithIndex {
fn drop(&mut self) {
unsafe { rocksdb_writebatch_wi_destroy(self.batch) }
pub fn get_for_update(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<Option<PinnableSlice>> {
unsafe {
let slice = ffi_result!(rocksdb_transaction_get_for_update_pinned_cf(
self.inner.transaction,
self.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
))?;
Ok(if slice.is_null() {
None
} else {
Some(PinnableSlice(slice))
})
}
}
pub fn contains_key_for_update(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<bool> {
Ok(self.get_for_update(column_family, key)?.is_some()) //TODO: optimize
}
}
impl WriteBatchWithIndex {
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
unsafe {
rocksdb_writebatch_wi_put_cf(
self.batch,
ffi_result!(rocksdb_transaction_put_cf(
self.inner.transaction,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
)
))
}
}
pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) {
pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.insert(column_family, key, &[])
}
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) {
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
unsafe {
rocksdb_writebatch_wi_delete_cf(
self.batch,
ffi_result!(rocksdb_transaction_delete_cf(
self.inner.transaction,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
)
))
}
}
pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
unsafe {
rocksdb_writebatch_wi_merge_cf(
self.batch,
ffi_result!(rocksdb_transaction_merge_cf(
self.inner.transaction,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
)
}
}
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Buffer>> {
unsafe {
let mut len = 0;
let base = ffi_result!(
rocksdb_transactiondb_writebatch_wi_get_from_batch_and_db_cf(
self.batch,
self.db.0.db,
self.db.0.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
&mut len
)
)? as *mut u8;
Ok(if base.is_null() {
None
} else {
Some(Buffer { base, len })
})
}
}
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn clear(&mut self) {
unsafe {
rocksdb_writebatch_wi_clear(self.batch);
))
}
}
pub fn len(&self) -> usize {
unsafe { rocksdb_writebatch_wi_count(self.batch) }
.try_into()
.unwrap()
}
}
pub struct PinnableSlice(*mut rocksdb_pinnableslice_t);
@ -693,7 +835,7 @@ pub struct Iter {
iter: *mut rocksdb_iterator_t,
is_currently_valid: bool,
_upper_bound: Option<Vec<u8>>,
_db: Arc<DbHandler>, // needed to ensure that DB still lives while iter is used
_reader: Reader, // needed to ensure that DB still lives while iter is used
options: *mut rocksdb_readoptions_t, // needed to ensure that options still lives while iter is used
}

@ -666,7 +666,12 @@ mod tests {
impl MemoryStrStore {
fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) {
insert_term(term, encoded, &mut |h, v| self.insert_str(h, v))
insert_term(term, encoded, &mut |h, v| {
self.insert_str(h, v);
let r: Result<(), Infallible> = Ok(());
r
})
.unwrap();
}
fn insert_str(&self, key: &StrHash, value: &str) {

@ -1,5 +1,6 @@
use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, Quad, QuadRef, TermRef};
use crate::storage::backend::{Reader, Transaction};
use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad,
@ -11,12 +12,13 @@ use crate::storage::numeric_encoder::{
};
use backend::{
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator, WriteBatchWithIndex,
MergeOperator,
};
#[cfg(not(target_arch = "wasm32"))]
use std::collections::{hash_map, HashMap, HashSet};
use std::ffi::CString;
use std::io::Result;
use std::mem::swap;
#[cfg(not(target_arch = "wasm32"))]
use std::mem::take;
#[cfg(not(target_arch = "wasm32"))]
@ -41,7 +43,7 @@ const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default";
const AUTO_WRITE_BATCH_THRESHOLD: usize = 1024;
const AUTO_WRITE_BATCH_THRESHOLD: usize = 1024 * 1024;
/// Low level storage primitives
#[derive(Clone)]
@ -217,38 +219,46 @@ impl Storage {
let mut version = this.ensure_version()?;
if version == 0 {
let mut batch = this.db.new_batch();
let mut transaction = this.db.transaction();
let mut size = 0;
// We migrate to v1
for quad in this.quads() {
for quad in this.reader().quads() {
let quad = quad?;
if !quad.graph_name.is_default_graph() {
batch.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name));
if batch.len() >= AUTO_WRITE_BATCH_THRESHOLD {
this.db.write(&mut batch)?;
transaction.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?;
size += 1;
if size % AUTO_WRITE_BATCH_THRESHOLD == 0 {
let mut tr = this.db.transaction();
swap(&mut transaction, &mut tr);
tr.commit()?;
}
}
}
this.db.write(&mut batch)?;
transaction.commit()?;
this.db.flush(&this.graphs_cf)?;
version = 1;
this.update_version(version)?;
}
if version == 1 {
// We migrate to v2
let mut batch = this.db.new_batch();
let mut iter = this.db.iter(&this.id2str_cf);
let mut transaction = this.db.transaction();
let reader = this.db.snapshot();
let mut size = 0;
let mut iter = reader.iter(&this.id2str_cf);
while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&i32::MAX.to_be_bytes());
new_value.extend_from_slice(value);
batch.insert(&this.id2str_cf, key, &new_value);
transaction.insert(&this.id2str_cf, key, &new_value)?;
iter.next();
if batch.len() >= AUTO_WRITE_BATCH_THRESHOLD {
this.db.write(&mut batch)?;
batch.clear();
size += 1;
if size % AUTO_WRITE_BATCH_THRESHOLD == 0 {
let mut tr = this.db.transaction();
swap(&mut transaction, &mut tr);
tr.commit()?;
}
}
this.db.write(&mut batch)?;
transaction.commit()?;
iter.status()?;
this.db.flush(&this.id2str_cf)?;
version = 2;
@ -270,7 +280,7 @@ impl Storage {
fn ensure_version(&self) -> Result<u64> {
Ok(
if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
if let Some(version) = self.reader().reader.get(&self.default_cf, b"oxversion")? {
let mut buffer = [0; 8];
buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer)
@ -282,28 +292,90 @@ impl Storage {
}
fn update_version(&self, version: u64) -> Result<()> {
let mut batch = self.db.new_batch();
batch.insert(&self.default_cf, b"oxversion", &version.to_be_bytes());
self.db.write(&mut batch)?;
let mut transaction = self.db.transaction();
transaction.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
transaction.commit()?;
self.db.flush(&self.default_cf)
}
/// Unsafe reader (data might appear and disapear between two reads)
/// Use [`snapshot`] if you don't want that.
pub fn reader(&self) -> StorageReader {
StorageReader {
reader: self.db.reader(),
storage: self.clone(),
}
}
pub fn snapshot(&self) -> StorageReader {
StorageReader {
reader: self.db.snapshot(),
storage: self.clone(),
}
}
pub fn transaction(&self) -> StorageWriter {
StorageWriter {
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE),
transaction: self.db.transaction(),
storage: self.clone(),
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> Result<()> {
self.db.flush(&self.default_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gosp_cf)?;
self.db.flush(&self.spog_cf)?;
self.db.flush(&self.posg_cf)?;
self.db.flush(&self.ospg_cf)?;
self.db.flush(&self.dspo_cf)?;
self.db.flush(&self.dpos_cf)?;
self.db.flush(&self.dosp_cf)?;
self.db.flush(&self.id2str_cf)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn compact(&self) -> Result<()> {
self.db.compact(&self.default_cf)?;
self.db.compact(&self.gpos_cf)?;
self.db.compact(&self.gpos_cf)?;
self.db.compact(&self.gosp_cf)?;
self.db.compact(&self.spog_cf)?;
self.db.compact(&self.posg_cf)?;
self.db.compact(&self.ospg_cf)?;
self.db.compact(&self.dspo_cf)?;
self.db.compact(&self.dpos_cf)?;
self.db.compact(&self.dosp_cf)?;
self.db.compact(&self.id2str_cf)
}
}
pub struct StorageReader {
reader: Reader,
storage: Storage,
}
impl StorageReader {
pub fn len(&self) -> Result<usize> {
Ok(self.db.len(&self.gspo_cf)? + self.db.len(&self.dspo_cf)?)
Ok(self.reader.len(&self.storage.gspo_cf)? + self.reader.len(&self.storage.dspo_cf)?)
}
pub fn is_empty(&self) -> Result<bool> {
Ok(self.db.is_empty(&self.gspo_cf)? && self.db.is_empty(&self.dspo_cf)?)
Ok(self.reader.is_empty(&self.storage.gspo_cf)?
&& self.reader.is_empty(&self.storage.dspo_cf)?)
}
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
Ok(self.db.contains_key(&self.dspo_cf, &buffer)?)
Ok(self.reader.contains_key(&self.storage.dspo_cf, &buffer)?)
} else {
write_gspo_quad(&mut buffer, quad);
Ok(self.db.contains_key(&self.gspo_cf, &buffer)?)
Ok(self.reader.contains_key(&self.storage.gspo_cf, &buffer)?)
}
}
@ -543,49 +615,49 @@ impl Storage {
pub fn named_graphs(&self) -> DecodingGraphIterator {
DecodingGraphIterator {
iter: self.db.iter(&self.graphs_cf),
iter: self.reader.iter(&self.storage.graphs_cf),
}
}
pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result<bool> {
self.db
.contains_key(&self.graphs_cf, &encode_term(graph_name))
self.reader
.contains_key(&self.storage.graphs_cf, &encode_term(graph_name))
}
fn spog_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.spog_cf, prefix, QuadEncoding::Spog)
self.inner_quads(&self.storage.spog_cf, prefix, QuadEncoding::Spog)
}
fn posg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.posg_cf, prefix, QuadEncoding::Posg)
self.inner_quads(&self.storage.posg_cf, prefix, QuadEncoding::Posg)
}
fn ospg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.ospg_cf, prefix, QuadEncoding::Ospg)
self.inner_quads(&self.storage.ospg_cf, prefix, QuadEncoding::Ospg)
}
fn gspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.gspo_cf, prefix, QuadEncoding::Gspo)
self.inner_quads(&self.storage.gspo_cf, prefix, QuadEncoding::Gspo)
}
fn gpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.gpos_cf, prefix, QuadEncoding::Gpos)
self.inner_quads(&self.storage.gpos_cf, prefix, QuadEncoding::Gpos)
}
fn gosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.gosp_cf, prefix, QuadEncoding::Gosp)
self.inner_quads(&self.storage.gosp_cf, prefix, QuadEncoding::Gosp)
}
fn dspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.dspo_cf, prefix, QuadEncoding::Dspo)
self.inner_quads(&self.storage.dspo_cf, prefix, QuadEncoding::Dspo)
}
fn dpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.dpos_cf, prefix, QuadEncoding::Dpos)
self.inner_quads(&self.storage.dpos_cf, prefix, QuadEncoding::Dpos)
}
fn dosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
self.inner_quads(&self.dosp_cf, prefix, QuadEncoding::Dosp)
self.inner_quads(&self.storage.dosp_cf, prefix, QuadEncoding::Dosp)
}
fn inner_quads(
@ -595,62 +667,14 @@ impl Storage {
encoding: QuadEncoding,
) -> DecodingQuadIterator {
DecodingQuadIterator {
iter: self.db.scan_prefix(column_family, prefix),
iter: self.reader.scan_prefix(column_family, prefix),
encoding,
}
}
pub fn atomic_writer(&self) -> StorageWriter {
StorageWriter {
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE),
batch: self.db.new_batch(),
storage: self.clone(),
auto_commit: false,
}
}
pub fn simple_writer(&self) -> StorageWriter {
StorageWriter {
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE),
batch: self.db.new_batch(),
storage: self.clone(),
auto_commit: true,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> Result<()> {
self.db.flush(&self.default_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gosp_cf)?;
self.db.flush(&self.spog_cf)?;
self.db.flush(&self.posg_cf)?;
self.db.flush(&self.ospg_cf)?;
self.db.flush(&self.dspo_cf)?;
self.db.flush(&self.dpos_cf)?;
self.db.flush(&self.dosp_cf)?;
self.db.flush(&self.id2str_cf)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn compact(&self) -> Result<()> {
self.db.compact(&self.default_cf)?;
self.db.compact(&self.gpos_cf)?;
self.db.compact(&self.gpos_cf)?;
self.db.compact(&self.gosp_cf)?;
self.db.compact(&self.spog_cf)?;
self.db.compact(&self.posg_cf)?;
self.db.compact(&self.ospg_cf)?;
self.db.compact(&self.dspo_cf)?;
self.db.compact(&self.dpos_cf)?;
self.db.compact(&self.dosp_cf)?;
self.db.compact(&self.id2str_cf)
}
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>> {
self.db
.get(&self.id2str_cf, &key.to_be_bytes())?
self.reader
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
.and_then(|v| {
let count = i32::from_be_bytes(v[..4].try_into().unwrap());
if count > 0 {
@ -665,8 +689,8 @@ impl Storage {
pub fn contains_str(&self, key: &StrHash) -> Result<bool> {
Ok(self
.db
.get(&self.id2str_cf, &key.to_be_bytes())?
.reader
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
.map_or(false, |v| {
i32::from_be_bytes(v[..4].try_into().unwrap()) > 0
}))
@ -743,7 +767,7 @@ impl Iterator for DecodingGraphIterator {
}
}
impl StrLookup for Storage {
impl StrLookup for StorageReader {
type Error = std::io::Error;
fn get_str(&self, key: &StrHash) -> Result<Option<String>> {
@ -757,86 +781,100 @@ impl StrLookup for Storage {
pub struct StorageWriter {
buffer: Vec<u8>,
batch: WriteBatchWithIndex,
transaction: Transaction,
storage: Storage,
auto_commit: bool,
}
impl StorageWriter {
pub fn reader(&self) -> StorageReader {
StorageReader {
reader: self.transaction.reader(),
storage: self.storage.clone(),
}
}
pub fn insert(&mut self, quad: QuadRef<'_>) -> Result<bool> {
let encoded = quad.into();
self.buffer.clear();
let result = if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, &encoded);
if self
.batch
.contains_key(&self.storage.dspo_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.dspo_cf, &self.buffer)?
{
false
} else {
self.batch.insert_empty(&self.storage.dspo_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.dspo_cf, &self.buffer)?;
self.buffer.clear();
write_pos_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.dpos_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.dpos_cf, &self.buffer)?;
self.buffer.clear();
write_osp_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.dosp_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.dosp_cf, &self.buffer)?;
self.insert_term(quad.subject.into(), &encoded.subject);
self.insert_term(quad.predicate.into(), &encoded.predicate);
self.insert_term(quad.object, &encoded.object);
self.insert_term(quad.subject.into(), &encoded.subject)?;
self.insert_term(quad.predicate.into(), &encoded.predicate)?;
self.insert_term(quad.object, &encoded.object)?;
true
}
} else {
write_spog_quad(&mut self.buffer, &encoded);
if self
.batch
.contains_key(&self.storage.spog_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.spog_cf, &self.buffer)?
{
false
} else {
self.batch.insert_empty(&self.storage.spog_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.spog_cf, &self.buffer)?;
self.buffer.clear();
write_posg_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.posg_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.posg_cf, &self.buffer)?;
self.buffer.clear();
write_ospg_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.ospg_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.ospg_cf, &self.buffer)?;
self.buffer.clear();
write_gspo_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.gspo_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.gspo_cf, &self.buffer)?;
self.buffer.clear();
write_gpos_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.gpos_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.gpos_cf, &self.buffer)?;
self.buffer.clear();
write_gosp_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.gosp_cf, &self.buffer);
self.transaction
.insert_empty(&self.storage.gosp_cf, &self.buffer)?;
self.insert_term(quad.subject.into(), &encoded.subject);
self.insert_term(quad.predicate.into(), &encoded.predicate);
self.insert_term(quad.object, &encoded.object);
self.insert_term(quad.subject.into(), &encoded.subject)?;
self.insert_term(quad.predicate.into(), &encoded.predicate)?;
self.insert_term(quad.object, &encoded.object)?;
self.buffer.clear();
write_term(&mut self.buffer, &encoded.graph_name);
if !self
.batch
.contains_key(&self.storage.graphs_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
{
self.batch
.insert_empty(&self.storage.graphs_cf, &self.buffer);
self.insert_graph_name(quad.graph_name, &encoded.graph_name);
self.transaction
.insert_empty(&self.storage.graphs_cf, &self.buffer)?;
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
true
}
};
self.write_if_needed()?;
Ok(result)
}
@ -846,38 +884,41 @@ impl StorageWriter {
self.buffer.clear();
write_term(&mut self.buffer, &encoded_graph_name);
let result = if self
.batch
.contains_key(&self.storage.graphs_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
{
false
} else {
self.batch
.insert_empty(&self.storage.graphs_cf, &self.buffer);
self.insert_term(graph_name.into(), &encoded_graph_name);
self.transaction
.insert_empty(&self.storage.graphs_cf, &self.buffer)?;
self.insert_term(graph_name.into(), &encoded_graph_name)?;
true
};
self.write_if_needed()?;
Ok(result)
}
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) {
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {
insert_term(term, encoded, &mut |key, value| self.insert_str(key, value))
}
fn insert_graph_name(&mut self, graph_name: GraphNameRef<'_>, encoded: &EncodedTerm) {
fn insert_graph_name(
&mut self,
graph_name: GraphNameRef<'_>,
encoded: &EncodedTerm,
) -> Result<()> {
match graph_name {
GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::DefaultGraph => (),
GraphNameRef::DefaultGraph => Ok(()),
}
}
fn insert_str(&mut self, key: &StrHash, value: &str) {
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<()> {
self.buffer.clear();
self.buffer.extend_from_slice(&1_i32.to_be_bytes());
self.buffer.extend_from_slice(value.as_bytes());
self.batch
.merge(&self.storage.id2str_cf, &key.to_be_bytes(), &self.buffer);
self.transaction
.merge(&self.storage.id2str_cf, &key.to_be_bytes(), &self.buffer)
}
pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool> {
@ -890,22 +931,25 @@ impl StorageWriter {
write_spo_quad(&mut self.buffer, quad);
if self
.batch
.contains_key(&self.storage.dspo_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.dspo_cf, &self.buffer)?
{
self.batch.remove(&self.storage.dspo_cf, &self.buffer);
self.transaction
.remove(&self.storage.dspo_cf, &self.buffer)?;
self.buffer.clear();
write_pos_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.dpos_cf, &self.buffer);
self.transaction
.remove(&self.storage.dpos_cf, &self.buffer)?;
self.buffer.clear();
write_osp_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.dosp_cf, &self.buffer);
self.transaction
.remove(&self.storage.dosp_cf, &self.buffer)?;
self.remove_term(&quad.subject);
self.remove_term(&quad.predicate);
self.remove_term(&quad.object);
self.remove_term(&quad.subject)?;
self.remove_term(&quad.predicate)?;
self.remove_term(&quad.object)?;
true
} else {
false
@ -914,59 +958,64 @@ impl StorageWriter {
write_spog_quad(&mut self.buffer, quad);
if self
.batch
.contains_key(&self.storage.spog_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.spog_cf, &self.buffer)?
{
self.batch.remove(&self.storage.spog_cf, &self.buffer);
self.transaction
.remove(&self.storage.spog_cf, &self.buffer)?;
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.posg_cf, &self.buffer);
self.transaction
.remove(&self.storage.posg_cf, &self.buffer)?;
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.ospg_cf, &self.buffer);
self.transaction
.remove(&self.storage.ospg_cf, &self.buffer)?;
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.gspo_cf, &self.buffer);
self.transaction
.remove(&self.storage.gspo_cf, &self.buffer)?;
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.gpos_cf, &self.buffer);
self.transaction
.remove(&self.storage.gpos_cf, &self.buffer)?;
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.gosp_cf, &self.buffer);
self.transaction
.remove(&self.storage.gosp_cf, &self.buffer)?;
self.remove_term(&quad.subject);
self.remove_term(&quad.predicate);
self.remove_term(&quad.object);
self.remove_term(&quad.subject)?;
self.remove_term(&quad.predicate)?;
self.remove_term(&quad.object)?;
true
} else {
false
}
};
self.write_if_needed()?;
Ok(result)
}
pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<()> {
for quad in self.storage.quads_for_graph(&graph_name.into()) {
for quad in self.reader().quads_for_graph(&graph_name.into()) {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_named_graphs(&mut self) -> Result<()> {
for quad in self.storage.quads_in_named_graph() {
for quad in self.reader().quads_in_named_graph() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_graphs(&mut self) -> Result<()> {
for quad in self.storage.quads() {
for quad in self.reader().quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
@ -977,64 +1026,60 @@ impl StorageWriter {
}
fn remove_encoded_named_graph(&mut self, graph_name: &EncodedTerm) -> Result<bool> {
for quad in self.storage.quads_for_graph(graph_name) {
for quad in self.reader().quads_for_graph(graph_name) {
self.remove_encoded(&quad?)?;
}
self.buffer.clear();
write_term(&mut self.buffer, graph_name);
let result = if self
.batch
.contains_key(&self.storage.graphs_cf, &self.buffer)?
.transaction
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
{
self.batch.remove(&self.storage.graphs_cf, &self.buffer);
self.remove_term(graph_name);
self.transaction
.remove(&self.storage.graphs_cf, &self.buffer)?;
self.remove_term(graph_name)?;
true
} else {
false
};
self.write_if_needed()?;
Ok(result)
}
pub fn remove_all_named_graphs(&mut self) -> Result<()> {
for graph_name in self.storage.named_graphs() {
for graph_name in self.reader().named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
Ok(())
}
pub fn clear(&mut self) -> Result<()> {
for graph_name in self.storage.named_graphs() {
for graph_name in self.reader().named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
for quad in self.storage.quads() {
for quad in self.reader().quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
fn remove_term(&mut self, encoded: &EncodedTerm) {
remove_term(encoded, &mut |key| self.remove_str(key));
fn remove_term(&mut self, encoded: &EncodedTerm) -> Result<()> {
remove_term(encoded, &mut |key| self.remove_str(key))
}
fn remove_str(&mut self, key: &StrHash) {
self.batch.merge(
fn remove_str(&mut self, key: &StrHash) -> Result<()> {
self.transaction.merge(
&self.storage.id2str_cf,
&key.to_be_bytes(),
&(-1_i32).to_be_bytes(),
)
}
fn write_if_needed(&mut self) -> Result<()> {
if self.auto_commit && self.batch.len() > AUTO_WRITE_BATCH_THRESHOLD {
self.commit()?;
}
Ok(())
pub fn commit(self) -> Result<()> {
self.transaction.commit()
}
pub fn commit(&mut self) -> Result<()> {
self.storage.db.write(&mut self.batch)?;
Ok(())
pub fn rollback(self) -> Result<()> {
self.transaction.rollback()
}
}
@ -1067,14 +1112,14 @@ impl BulkLoader {
let encoded = EncodedQuad::from(quad.as_ref());
if quad.graph_name.is_default_graph() {
if self.triples.insert(encoded.clone()) {
self.insert_term(quad.subject.as_ref().into(), &encoded.subject);
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate);
self.insert_term(quad.object.as_ref(), &encoded.object);
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
self.insert_term(quad.object.as_ref(), &encoded.object)?;
}
} else if self.quads.insert(encoded.clone()) {
self.insert_term(quad.subject.as_ref().into(), &encoded.subject);
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate);
self.insert_term(quad.object.as_ref(), &encoded.object);
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
self.insert_term(quad.object.as_ref(), &encoded.object)?;
if self.graphs.insert(encoded.graph_name.clone()) {
self.insert_term(
match quad.graph_name.as_ref() {
@ -1083,7 +1128,7 @@ impl BulkLoader {
GraphNameRef::DefaultGraph => unreachable!(),
},
&encoded.graph_name,
);
)?;
}
}
count += 1;
@ -1223,11 +1268,9 @@ impl BulkLoader {
self.storage.db.write_stt_files(to_load)
}
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) {
insert_term(
term,
encoded,
&mut |key, value| match self.id2str.entry(*key) {
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {
insert_term(term, encoded, &mut |key, value| {
match self.id2str.entry(*key) {
hash_map::Entry::Occupied(mut e) => {
let e = e.get_mut();
e.0 = e.0.wrapping_add(1);
@ -1235,8 +1278,9 @@ impl BulkLoader {
hash_map::Entry::Vacant(e) => {
e.insert((1, value.into()));
}
},
)
};
Ok(())
})
}
fn build_sst_for_keys(&self, values: impl Iterator<Item = Vec<u8>>) -> Result<PathBuf> {
@ -1255,6 +1299,27 @@ mod tests {
use super::*;
use crate::model::NamedNodeRef;
#[test]
fn test_transaction_isolation() -> Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let storage = Storage::new()?;
let mut t1 = storage.transaction();
let snapshot = storage.snapshot();
t1.insert(quad)?;
t1.commit()?;
assert_eq!(snapshot.len()?, 0);
let mut t2 = storage.transaction();
let mut t3 = storage.transaction();
t2.insert(quad)?;
assert!(t3.remove(quad).is_err()); // Already locked
Ok(())
}
#[test]
fn test_strings_removal() -> Result<()> {
let quad = QuadRef::new(
@ -1271,37 +1336,40 @@ mod tests {
);
let storage = Storage::new()?;
let mut writer = storage.atomic_writer();
let reader = storage.reader();
let mut writer = storage.transaction();
writer.insert(quad)?;
writer.insert(quad2)?;
writer.remove(quad2)?;
writer.commit()?;
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/s"))?
.is_some());
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/p"))?
.is_some());
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/o2"))?
.is_none());
let mut writer = storage.transaction();
writer.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
writer.commit()?;
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/s"))?
.is_none());
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/p"))?
.is_none());
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/o"))?
.is_none());
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/g"))?
.is_some());
let mut writer = storage.transaction();
writer.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
writer.commit()?;
assert!(storage
assert!(reader
.get_str(&StrHash::new("http://example.com/g"))?
.is_none());
Ok(())

@ -665,11 +665,11 @@ pub trait StrLookup {
fn contains_str(&self, key: &StrHash) -> Result<bool, Self::Error>;
}
pub fn insert_term<F: FnMut(&StrHash, &str)>(
pub fn insert_term<E, F: FnMut(&StrHash, &str) -> Result<(), E>>(
term: TermRef<'_>,
encoded: &EncodedTerm,
insert_str: &mut F,
) {
) -> Result<(), E> {
match term {
TermRef::NamedNode(node) => {
if let EncodedTerm::NamedNode { iri_id } = encoded {
@ -680,7 +680,7 @@ pub fn insert_term<F: FnMut(&StrHash, &str)>(
}
TermRef::BlankNode(node) => match encoded {
EncodedTerm::BigBlankNode { id_id } => insert_str(id_id, node.as_str()),
EncodedTerm::SmallBlankNode(..) | EncodedTerm::NumericalBlankNode { .. } => (),
EncodedTerm::SmallBlankNode(..) | EncodedTerm::NumericalBlankNode { .. } => Ok(()),
_ => unreachable!("Invalid term encoding {:?} for {}", encoded, term),
},
TermRef::Literal(literal) => match encoded {
@ -699,7 +699,7 @@ pub fn insert_term<F: FnMut(&StrHash, &str)>(
value_id,
language_id,
} => {
insert_str(value_id, literal.value());
insert_str(value_id, literal.value())?;
if let Some(language) = literal.language() {
insert_str(language_id, language)
} else {
@ -713,7 +713,7 @@ pub fn insert_term<F: FnMut(&StrHash, &str)>(
value_id,
datatype_id,
} => {
insert_str(value_id, literal.value());
insert_str(value_id, literal.value())?;
insert_str(datatype_id, literal.datatype().as_str())
}
EncodedTerm::SmallStringLiteral(..)
@ -733,17 +733,17 @@ pub fn insert_term<F: FnMut(&StrHash, &str)>(
| EncodedTerm::GMonthLiteral(..)
| EncodedTerm::DurationLiteral(..)
| EncodedTerm::YearMonthDurationLiteral(..)
| EncodedTerm::DayTimeDurationLiteral(..) => (),
| EncodedTerm::DayTimeDurationLiteral(..) => Ok(()),
_ => unreachable!("Invalid term encoding {:?} for {}", encoded, term),
},
TermRef::Triple(triple) => {
if let EncodedTerm::Triple(encoded) = encoded {
insert_term(triple.subject.as_ref().into(), &encoded.subject, insert_str);
insert_term(triple.subject.as_ref().into(), &encoded.subject, insert_str)?;
insert_term(
triple.predicate.as_ref().into(),
&encoded.predicate,
insert_str,
);
)?;
insert_term(triple.object.as_ref(), &encoded.object, insert_str)
} else {
unreachable!("Invalid term encoding {:?} for {}", encoded, term)
@ -752,7 +752,10 @@ pub fn insert_term<F: FnMut(&StrHash, &str)>(
}
}
pub fn remove_term<F: FnMut(&StrHash)>(encoded: &EncodedTerm, remove_str: &mut F) {
pub fn remove_term<E, F: FnMut(&StrHash) -> Result<(), E>>(
encoded: &EncodedTerm,
remove_str: &mut F,
) -> Result<(), E> {
match encoded {
EncodedTerm::NamedNode { iri_id } => remove_str(iri_id),
EncodedTerm::BigBlankNode { id_id } => remove_str(id_id),
@ -763,7 +766,7 @@ pub fn remove_term<F: FnMut(&StrHash)>(encoded: &EncodedTerm, remove_str: &mut F
value_id,
language_id,
} => {
remove_str(value_id);
remove_str(value_id)?;
remove_str(language_id)
}
EncodedTerm::SmallTypedLiteral { datatype_id, .. } => remove_str(datatype_id),
@ -771,15 +774,15 @@ pub fn remove_term<F: FnMut(&StrHash)>(encoded: &EncodedTerm, remove_str: &mut F
value_id,
datatype_id,
} => {
remove_str(value_id);
remove_str(value_id)?;
remove_str(datatype_id)
}
EncodedTerm::Triple(encoded) => {
remove_term(&encoded.subject, remove_str);
remove_term(&encoded.predicate, remove_str);
remove_term(&encoded.subject, remove_str)?;
remove_term(&encoded.predicate, remove_str)?;
remove_term(&encoded.object, remove_str)
}
_ => (),
_ => Ok(()),
}
}

@ -34,7 +34,7 @@ use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::BulkLoader;
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage};
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader};
use std::io::{BufRead, Write};
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path;
@ -44,6 +44,9 @@ use std::{fmt, io, str};
/// Allows to query and update it using SPARQL.
/// It is based on the [RocksDB](https://rocksdb.org/) key-value store.
///
/// This store ensure the "repeatable read" isolation level.
/// All operations are not able to read the result of concurrent
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
@ -157,14 +160,15 @@ impl Store {
object: Option<TermRef<'_>>,
graph_name: Option<GraphNameRef<'_>>,
) -> QuadIter {
let reader = self.storage.snapshot();
QuadIter {
iter: self.storage.quads_for_pattern(
iter: reader.quads_for_pattern(
subject.map(EncodedTerm::from).as_ref(),
predicate.map(EncodedTerm::from).as_ref(),
object.map(EncodedTerm::from).as_ref(),
graph_name.map(EncodedTerm::from).as_ref(),
),
storage: self.storage.clone(),
reader,
}
}
@ -176,19 +180,19 @@ impl Store {
/// Checks if this store contains a given quad
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let quad = EncodedQuad::from(quad.into());
self.storage.contains(&quad)
self.storage.reader().contains(&quad)
}
/// Returns the number of quads in the store
///
/// Warning: this function executes a full scan
pub fn len(&self) -> io::Result<usize> {
self.storage.len()
self.storage.reader().len()
}
/// Returns if the store is empty
pub fn is_empty(&self) -> io::Result<bool> {
self.storage.is_empty()
self.storage.reader().is_empty()
}
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/).
@ -231,10 +235,9 @@ impl Store {
)
}
/// Loads a graph file (i.e. triples) into the store
/// Loads a graph file (i.e. triples) into the store.
///
/// Warning: This function is not atomic. If an error happens during the file parsing, only a
/// part of the file might be written to the store.
/// This function is atomic and quite slow. To get much better performances you should use [`create_from_dataset`].
///
/// Usage example:
/// ```
@ -264,15 +267,14 @@ impl Store {
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> io::Result<()> {
let mut writer = self.storage.simple_writer();
let mut writer = self.storage.transaction();
load_graph(&mut writer, reader, format, to_graph_name.into(), base_iri)?;
writer.commit()
}
/// Loads a dataset file (i.e. quads) into the store.
///
/// Warning: This function is not atomic. If an error happens during the file parsing, only a
/// part of the file might be written to the store.
/// This function is atomic and quite slow. To get much better performances you should use [`create_from_dataset`].
///
/// Usage example:
/// ```
@ -301,7 +303,7 @@ impl Store {
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
let mut writer = self.storage.simple_writer();
let mut writer = self.storage.transaction();
load_dataset(&mut writer, reader, format, base_iri)?;
writer.commit()
}
@ -325,7 +327,7 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let mut writer = self.storage.atomic_writer();
let mut writer = self.storage.transaction();
let result = writer.insert(quad.into())?;
writer.commit()?;
Ok(result)
@ -351,7 +353,7 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let mut writer = self.storage.atomic_writer();
let mut writer = self.storage.transaction();
let result = writer.remove(quad.into())?;
writer.commit()?;
Ok(result)
@ -424,9 +426,10 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn named_graphs(&self) -> GraphNameIter {
let reader = self.storage.snapshot();
GraphNameIter {
iter: self.storage.named_graphs(),
store: self.clone(),
iter: reader.named_graphs(),
reader,
}
}
@ -448,7 +451,7 @@ impl Store {
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let graph_name = EncodedTerm::from(graph_name.into());
self.storage.contains_named_graph(&graph_name)
self.storage.reader().contains_named_graph(&graph_name)
}
/// Inserts a graph into this store
@ -471,7 +474,7 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let mut writer = self.storage.atomic_writer();
let mut writer = self.storage.transaction();
let result = writer.insert_named_graph(graph_name.into())?;
writer.commit()?;
Ok(result)
@ -496,7 +499,7 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear_graph<'a>(&self, graph_name: impl Into<GraphNameRef<'a>>) -> io::Result<()> {
let mut writer = self.storage.simple_writer();
let mut writer = self.storage.transaction();
writer.clear_graph(graph_name.into())?;
writer.commit()
}
@ -525,7 +528,7 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let mut writer = self.storage.simple_writer();
let mut writer = self.storage.transaction();
let result = writer.remove_named_graph(graph_name.into())?;
writer.commit()?;
Ok(result)
@ -549,7 +552,7 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear(&self) -> io::Result<()> {
let mut writer = self.storage.simple_writer();
let mut writer = self.storage.transaction();
writer.clear()?;
writer.commit()
}
@ -632,7 +635,7 @@ impl fmt::Display for Store {
/// An iterator returning the quads contained in a [`Store`].
pub struct QuadIter {
iter: ChainedDecodingQuadIterator,
storage: Storage,
reader: StorageReader,
}
impl Iterator for QuadIter {
@ -640,7 +643,7 @@ impl Iterator for QuadIter {
fn next(&mut self) -> Option<io::Result<Quad>> {
Some(match self.iter.next()? {
Ok(quad) => self.storage.decode_quad(&quad).map_err(|e| e.into()),
Ok(quad) => self.reader.decode_quad(&quad).map_err(|e| e.into()),
Err(error) => Err(error),
})
}
@ -649,7 +652,7 @@ impl Iterator for QuadIter {
/// An iterator returning the graph names contained in a [`Store`].
pub struct GraphNameIter {
iter: DecodingGraphIterator,
store: Store,
reader: StorageReader,
}
impl Iterator for GraphNameIter {
@ -657,9 +660,9 @@ impl Iterator for GraphNameIter {
fn next(&mut self) -> Option<io::Result<NamedOrBlankNode>> {
Some(
self.iter.next()?.and_then(|graph_name| {
Ok(self.store.storage.decode_named_or_blank_node(&graph_name)?)
}),
self.iter
.next()?
.and_then(|graph_name| Ok(self.reader.decode_named_or_blank_node(&graph_name)?)),
)
}

@ -149,6 +149,25 @@ fn test_dump_dataset() -> io::Result<()> {
Ok(())
}
#[test]
fn test_snapshot_isolation_iterator() -> io::Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let store = Store::new()?;
store.insert(quad)?;
let iter = store.iter();
store.remove(quad)?;
assert_eq!(
iter.collect::<io::Result<Vec<_>>>()?,
vec![quad.into_owned()]
);
Ok(())
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_backward_compatibility() -> io::Result<()> {

@ -42,39 +42,6 @@ void rocksdb_transactiondb_compact_range_cf_opt(rocksdb_transactiondb_t* db,
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
}
void rocksdb_transactiondb_write_writebatch_wi(
rocksdb_transactiondb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_writebatch_wi_t* wbwi,
char** errptr) {
WriteBatch* wb = wbwi->rep->GetWriteBatch();
SaveError(errptr, db->rep->Write(options->rep, wb));
}
char* rocksdb_transactiondb_writebatch_wi_get_from_batch_and_db_cf(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_transactiondb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
size_t* vallen,
char** errptr) {
char* result = nullptr;
std::string tmp;
Status s = wbwi->rep->GetFromBatchAndDB(db->rep, options->rep, column_family->rep,
Slice(key, keylen), &tmp);
if (s.ok()) {
*vallen = tmp.size();
result = CopyString(tmp);
} else {
*vallen = 0;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
}
return result;
}
void rocksdb_transactiondb_ingest_external_file_cf(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
@ -86,8 +53,42 @@ void rocksdb_transactiondb_ingest_external_file_cf(
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
}
rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) {
return new rocksdb_writeoptions_t(*options);
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
&v->rep);
if (!s.ok()) {
delete v;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = txn->rep->GetForUpdate(options->rep, column_family->rep, Slice(key, keylen),
&v->rep);
if (!s.ok()) {
delete v;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
rocksdb_readoptions_t* rocksdb_readoptions_create_copy(rocksdb_readoptions_t* options) {
return new rocksdb_readoptions_t(*options);
}
}

@ -20,28 +20,23 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt(
rocksdb_compactoptions_t* opt, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_write_writebatch_wi(
rocksdb_transactiondb_t* db,
const rocksdb_writeoptions_t* options,
rocksdb_writebatch_wi_t* wbwi,
char** errptr);
extern ROCKSDB_LIBRARY_API char* rocksdb_transactiondb_writebatch_wi_get_from_batch_and_db_cf(
rocksdb_writebatch_wi_t* wbwi,
rocksdb_transactiondb_t* db,
const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
size_t* vallen,
char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_file_cf(
rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* handle,
const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(
rocksdb_writeoptions_t*);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_readoptions_t* rocksdb_readoptions_create_copy(
rocksdb_readoptions_t*);
#ifdef __cplusplus
}

Loading…
Cancel
Save