Returns a clean error on transaction read after commit

pull/171/head
Tpt 3 years ago
parent aaa730d684
commit 7b1c4e0ad5
  1. 43
      lib/src/storage/backend/rocksdb.rs
  2. 6
      lib/src/storage/mod.rs

@ -542,6 +542,9 @@ impl Reader {
key.len() key.len()
)), )),
InnerReader::Transaction(inner) => { InnerReader::Transaction(inner) => {
if inner.is_ended.get() {
return Err(invalid_input_error("The transaction is already ended"));
}
ffi_result!(rocksdb_transaction_get_pinned_cf( ffi_result!(rocksdb_transaction_get_pinned_cf(
inner.transaction, inner.transaction,
self.options, self.options,
@ -564,12 +567,12 @@ impl Reader {
} }
#[must_use] #[must_use]
pub fn iter(&self, column_family: &ColumnFamily) -> Iter { pub fn iter(&self, column_family: &ColumnFamily) -> Result<Iter> {
self.scan_prefix(column_family, &[]) self.scan_prefix(column_family, &[])
} }
#[must_use] #[must_use]
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter { pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Result<Iter> {
//We generate the upper bound //We generate the upper bound
let upper_bound = { let upper_bound = {
let mut bound = prefix.to_vec(); let mut bound = prefix.to_vec();
@ -608,11 +611,16 @@ impl Reader {
InnerReader::Snapshot(inner) => { InnerReader::Snapshot(inner) => {
rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0) rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0)
} }
InnerReader::Transaction(inner) => rocksdb_transaction_create_iterator_cf( InnerReader::Transaction(inner) => {
inner.transaction, if inner.is_ended.get() {
options, return Err(invalid_input_error("The transaction is already ended"));
column_family.0, }
), rocksdb_transaction_create_iterator_cf(
inner.transaction,
options,
column_family.0,
)
}
}; };
assert!(!iter.is_null(), "rocksdb_create_iterator returned null"); assert!(!iter.is_null(), "rocksdb_create_iterator returned null");
if prefix.is_empty() { if prefix.is_empty() {
@ -621,19 +629,19 @@ impl Reader {
rocksdb_iter_seek(iter, prefix.as_ptr() as *const c_char, prefix.len()); rocksdb_iter_seek(iter, prefix.as_ptr() as *const c_char, prefix.len());
} }
let is_currently_valid = rocksdb_iter_valid(iter) != 0; let is_currently_valid = rocksdb_iter_valid(iter) != 0;
Iter { Ok(Iter {
iter, iter,
options, options,
_upper_bound: upper_bound, _upper_bound: upper_bound,
_reader: self.clone(), _reader: self.clone(),
is_currently_valid, is_currently_valid,
} })
} }
} }
pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> { pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
let mut count = 0; let mut count = 0;
let mut iter = self.iter(column_family); let mut iter = self.iter(column_family)?;
while iter.is_valid() { while iter.is_valid() {
count += 1; count += 1;
iter.next(); iter.next();
@ -643,7 +651,7 @@ impl Reader {
} }
pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> { pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
let iter = self.iter(column_family); let iter = self.iter(column_family)?;
iter.status()?; // We makes sure there is no read problem iter.status()?; // We makes sure there is no read problem
Ok(!iter.is_valid()) Ok(!iter.is_valid())
} }
@ -1095,3 +1103,16 @@ fn path_to_cstring(path: &Path) -> Result<CString> {
) )
.map_err(invalid_input_error) .map_err(invalid_input_error)
} }
#[test]
fn test_transaction_read_after_commit() -> Result<()> {
let db = Db::new(vec![])?;
let cf = db.column_family("default").unwrap();
let mut tr = db.transaction();
let reader = tr.reader();
tr.insert(&cf, b"test", b"foo")?;
assert_eq!(reader.get(&cf, b"test")?.as_deref(), Some(b"foo".as_ref()));
tr.commit()?;
assert!(reader.get(&cf, b"test").is_err());
Ok(())
}

@ -244,7 +244,7 @@ impl Storage {
let mut transaction = this.db.transaction(); let mut transaction = this.db.transaction();
let reader = this.db.snapshot(); let reader = this.db.snapshot();
let mut size = 0; let mut size = 0;
let mut iter = reader.iter(&this.id2str_cf); let mut iter = reader.iter(&this.id2str_cf)?;
while let (Some(key), Some(value)) = (iter.key(), iter.value()) { while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4); let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&i32::MAX.to_be_bytes()); new_value.extend_from_slice(&i32::MAX.to_be_bytes());
@ -615,7 +615,7 @@ impl StorageReader {
pub fn named_graphs(&self) -> DecodingGraphIterator { pub fn named_graphs(&self) -> DecodingGraphIterator {
DecodingGraphIterator { DecodingGraphIterator {
iter: self.reader.iter(&self.storage.graphs_cf), iter: self.reader.iter(&self.storage.graphs_cf).unwrap(), //TODO: propagate error?
} }
} }
@ -667,7 +667,7 @@ impl StorageReader {
encoding: QuadEncoding, encoding: QuadEncoding,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
DecodingQuadIterator { DecodingQuadIterator {
iter: self.reader.scan_prefix(column_family, prefix), iter: self.reader.scan_prefix(column_family, prefix).unwrap(), // TODO: propagate error?
encoding, encoding,
} }
} }

Loading…
Cancel
Save