Moves str2id outside of transactions

Allows avoiding conflicts and reducing transactional guarantees
pull/190/head
Tpt 3 years ago
parent df1478e931
commit a91ea89fff
  1. 1
      lib/src/storage/backend/fallback.rs
  2. 54
      lib/src/storage/backend/rocksdb.rs
  3. 55
      lib/src/storage/mod.rs
  4. 10
      rocksdb-sys/api/c.cc
  5. 5
      rocksdb-sys/api/c.h

@ -12,6 +12,7 @@ pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub use_iter: bool, pub use_iter: bool,
pub min_prefix_size: usize, pub min_prefix_size: usize,
pub unordered_writes: bool,
} }
#[derive(Clone)] #[derive(Clone)]

@ -74,6 +74,7 @@ pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub use_iter: bool, pub use_iter: bool,
pub min_prefix_size: usize, pub min_prefix_size: usize,
pub unordered_writes: bool,
} }
#[derive(Clone)] #[derive(Clone)]
@ -208,6 +209,7 @@ impl Db {
name: "default", name: "default",
use_iter: true, use_iter: true,
min_prefix_size: 0, min_prefix_size: 0,
unordered_writes: false,
}) })
} }
let column_family_names = column_families.iter().map(|c| c.name).collect::<Vec<_>>(); let column_family_names = column_families.iter().map(|c| c.name).collect::<Vec<_>>();
@ -229,6 +231,9 @@ impl Db {
rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size), rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size),
); );
} }
if cf.unordered_writes {
rocksdb_options_set_unordered_write(options, 1);
}
options options
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -423,6 +428,55 @@ impl Db {
} }
} }
pub fn get(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<Option<PinnableSlice>, StorageError> {
unsafe {
let slice = ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status(
self.0.db,
self.0.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
))?;
Ok(if slice.is_null() {
None
} else {
Some(PinnableSlice(slice))
})
}
}
pub fn contains_key(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<bool, StorageError> {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn insert(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_transactiondb_put_cf_with_status(
self.0.db,
self.0.write_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))?;
}
Ok(())
}
pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> {
unsafe { unsafe {
ffi_result!(rocksdb_transactiondb_flush_cf_with_status( ffi_result!(rocksdb_transactiondb_flush_cf_with_status(

@ -76,56 +76,67 @@ impl Storage {
name: ID2STR_CF, name: ID2STR_CF,
use_iter: false, use_iter: false,
min_prefix_size: 0, min_prefix_size: 0,
unordered_writes: true,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: SPOG_CF, name: SPOG_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: POSG_CF, name: POSG_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named node start min_prefix_size: 17, // named node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: OSPG_CF, name: OSPG_CF,
use_iter: true, use_iter: true,
min_prefix_size: 0, // There are small literals... min_prefix_size: 0, // There are small literals...
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GSPO_CF, name: GSPO_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GPOS_CF, name: GPOS_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GOSP_CF, name: GOSP_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: DSPO_CF, name: DSPO_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: DPOS_CF, name: DPOS_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: DOSP_CF, name: DOSP_CF,
use_iter: true, use_iter: true,
min_prefix_size: 0, // There are small literals... min_prefix_size: 0, // There are small literals...
unordered_writes: false,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GRAPHS_CF, name: GRAPHS_CF,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
unordered_writes: false,
}, },
] ]
} }
@ -194,7 +205,7 @@ impl Storage {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
fn ensure_version(&self) -> Result<u64, StorageError> { fn ensure_version(&self) -> Result<u64, StorageError> {
Ok( Ok(
if let Some(version) = self.db.snapshot().get(&self.default_cf, b"oxversion")? { if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
let mut buffer = [0; 8]; let mut buffer = [0; 8];
buffer.copy_from_slice(&version); buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer) u64::from_be_bytes(buffer)
@ -207,9 +218,8 @@ impl Storage {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
fn update_version(&self, version: u64) -> Result<(), StorageError> { fn update_version(&self, version: u64) -> Result<(), StorageError> {
self.db.transaction(|mut t| { self.db
t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes()) .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
})?;
self.db.flush(&self.default_cf) self.db.flush(&self.default_cf)
} }
@ -583,6 +593,18 @@ impl StorageReader {
} }
} }
#[cfg(not(target_arch = "wasm32"))]
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(self
.storage
.db
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
.map(|v| String::from_utf8(v.into()))
.transpose()
.map_err(CorruptionError::new)?)
}
#[cfg(target_arch = "wasm32")]
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> { pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(self Ok(self
.reader .reader
@ -592,6 +614,14 @@ impl StorageReader {
.map_err(CorruptionError::new)?) .map_err(CorruptionError::new)?)
} }
#[cfg(not(target_arch = "wasm32"))]
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
self.storage
.db
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
}
#[cfg(target_arch = "wasm32")]
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> { pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
self.reader self.reader
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) .contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
@ -819,6 +849,23 @@ impl<'a> StorageWriter<'a> {
} }
} }
#[cfg(not(target_arch = "wasm32"))]
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
if self
.storage
.db
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes())?
{
return Ok(());
}
self.storage.db.insert(
&self.storage.id2str_cf,
&key.to_be_bytes(),
value.as_bytes(),
)
}
#[cfg(target_arch = "wasm32")]
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
self.transaction.insert( self.transaction.insert(
&self.storage.id2str_cf, &self.storage.id2str_cf,

@ -61,6 +61,16 @@ rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf_with_status(
return v; return v;
} }
void rocksdb_transactiondb_put_cf_with_status(rocksdb_transactiondb_t* txn_db,
const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family,
const char* key, size_t keylen,
const char* val, size_t vallen,
rocksdb_status_t* statusptr) {
SaveStatus(statusptr, txn_db->rep->Put(options->rep, column_family->rep,
Slice(key, keylen), Slice(val, vallen)));
}
void rocksdb_transactiondb_flush_cf_with_status( void rocksdb_transactiondb_flush_cf_with_status(
rocksdb_transactiondb_t* db, rocksdb_transactiondb_t* db,
const rocksdb_flushoptions_t* options, const rocksdb_flushoptions_t* options,

@ -78,6 +78,11 @@ extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pi
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, rocksdb_status_t* statusptr); size_t keylen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_put_cf_with_status(
rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, const char* val, size_t vallen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf_with_status( extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf_with_status(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, rocksdb_status_t* statusptr); rocksdb_column_family_handle_t* column_family, rocksdb_status_t* statusptr);

Loading…
Cancel
Save