diff --git a/lib/src/storage/backend/fallback.rs b/lib/src/storage/backend/fallback.rs index 2e98ae2e..4d8df9ad 100644 --- a/lib/src/storage/backend/fallback.rs +++ b/lib/src/storage/backend/fallback.rs @@ -12,6 +12,7 @@ pub struct ColumnFamilyDefinition { pub name: &'static str, pub use_iter: bool, pub min_prefix_size: usize, + pub unordered_writes: bool, } #[derive(Clone)] diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 851ba470..63cffd65 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -74,6 +74,7 @@ pub struct ColumnFamilyDefinition { pub name: &'static str, pub use_iter: bool, pub min_prefix_size: usize, + pub unordered_writes: bool, } #[derive(Clone)] @@ -208,6 +209,7 @@ impl Db { name: "default", use_iter: true, min_prefix_size: 0, + unordered_writes: false, }) } let column_family_names = column_families.iter().map(|c| c.name).collect::>(); @@ -229,6 +231,9 @@ impl Db { rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size), ); } + if cf.unordered_writes { + rocksdb_options_set_unordered_write(options, 1); + } options }) .collect::>(); @@ -423,6 +428,55 @@ impl Db { } } + pub fn get( + &self, + column_family: &ColumnFamily, + key: &[u8], + ) -> Result, StorageError> { + unsafe { + let slice = ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( + self.0.db, + self.0.read_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len() + ))?; + Ok(if slice.is_null() { + None + } else { + Some(PinnableSlice(slice)) + }) + } + } + + pub fn contains_key( + &self, + column_family: &ColumnFamily, + key: &[u8], + ) -> Result { + Ok(self.get(column_family, key)?.is_some()) //TODO: optimize + } + + pub fn insert( + &self, + column_family: &ColumnFamily, + key: &[u8], + value: &[u8], + ) -> Result<(), StorageError> { + unsafe { + ffi_result!(rocksdb_transactiondb_put_cf_with_status( + self.0.db, + self.0.write_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + ))?; + } + Ok(()) + } + pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_flush_cf_with_status( diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index cdd2a37e..ab1c5cd1 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -76,56 +76,67 @@ impl Storage { name: ID2STR_CF, use_iter: false, min_prefix_size: 0, + unordered_writes: true, }, ColumnFamilyDefinition { name: SPOG_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ColumnFamilyDefinition { name: POSG_CF, use_iter: true, min_prefix_size: 17, // named node start + unordered_writes: false, }, ColumnFamilyDefinition { name: OSPG_CF, use_iter: true, min_prefix_size: 0, // There are small literals... + unordered_writes: false, }, ColumnFamilyDefinition { name: GSPO_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ColumnFamilyDefinition { name: GPOS_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ColumnFamilyDefinition { name: GOSP_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ColumnFamilyDefinition { name: DSPO_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ColumnFamilyDefinition { name: DPOS_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ColumnFamilyDefinition { name: DOSP_CF, use_iter: true, min_prefix_size: 0, // There are small literals... + unordered_writes: false, }, ColumnFamilyDefinition { name: GRAPHS_CF, use_iter: true, min_prefix_size: 17, // named or blank node start + unordered_writes: false, }, ] } @@ -194,7 +205,7 @@ impl Storage { #[cfg(not(target_arch = "wasm32"))] fn ensure_version(&self) -> Result { Ok( - if let Some(version) = self.db.snapshot().get(&self.default_cf, b"oxversion")? { + if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? { let mut buffer = [0; 8]; buffer.copy_from_slice(&version); u64::from_be_bytes(buffer) @@ -207,9 +218,8 @@ impl Storage { #[cfg(not(target_arch = "wasm32"))] fn update_version(&self, version: u64) -> Result<(), StorageError> { - self.db.transaction(|mut t| { - t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes()) - })?; + self.db + .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; self.db.flush(&self.default_cf) } @@ -583,6 +593,18 @@ impl StorageReader { } } + #[cfg(not(target_arch = "wasm32"))] + pub fn get_str(&self, key: &StrHash) -> Result, StorageError> { + Ok(self + .storage + .db + .get(&self.storage.id2str_cf, &key.to_be_bytes())? + .map(|v| String::from_utf8(v.into())) + .transpose() + .map_err(CorruptionError::new)?) + } + + #[cfg(target_arch = "wasm32")] pub fn get_str(&self, key: &StrHash) -> Result, StorageError> { Ok(self .reader @@ -592,6 +614,14 @@ impl StorageReader { .map_err(CorruptionError::new)?) } + #[cfg(not(target_arch = "wasm32"))] + pub fn contains_str(&self, key: &StrHash) -> Result { + self.storage + .db + .contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) + } + + #[cfg(target_arch = "wasm32")] pub fn contains_str(&self, key: &StrHash) -> Result { self.reader .contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) @@ -819,6 +849,23 @@ impl<'a> StorageWriter<'a> { } } + #[cfg(not(target_arch = "wasm32"))] + fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { + if self + .storage + .db + .contains_key(&self.storage.id2str_cf, &key.to_be_bytes())? + { + return Ok(()); + } + self.storage.db.insert( + &self.storage.id2str_cf, + &key.to_be_bytes(), + value.as_bytes(), + ) + } + + #[cfg(target_arch = "wasm32")] fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { self.transaction.insert( &self.storage.id2str_cf, diff --git a/rocksdb-sys/api/c.cc b/rocksdb-sys/api/c.cc index bb1384ee..e5e3ec43 100644 --- a/rocksdb-sys/api/c.cc +++ b/rocksdb-sys/api/c.cc @@ -61,6 +61,16 @@ rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf_with_status( return v; } +void rocksdb_transactiondb_put_cf_with_status(rocksdb_transactiondb_t* txn_db, + const rocksdb_writeoptions_t* options, + rocksdb_column_family_handle_t* column_family, + const char* key, size_t keylen, + const char* val, size_t vallen, + rocksdb_status_t* statusptr) { + SaveStatus(statusptr, txn_db->rep->Put(options->rep, column_family->rep, + Slice(key, keylen), Slice(val, vallen))); +} + void rocksdb_transactiondb_flush_cf_with_status( rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, diff --git a/rocksdb-sys/api/c.h b/rocksdb-sys/api/c.h index bafd5579..6ef2888e 100644 --- a/rocksdb-sys/api/c.h +++ b/rocksdb-sys/api/c.h @@ -78,6 +78,11 @@ extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pi rocksdb_column_family_handle_t* column_family, const char* key, size_t keylen, rocksdb_status_t* statusptr); +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_put_cf_with_status( + rocksdb_transactiondb_t* txn_db, const rocksdb_writeoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, const char* val, size_t vallen, rocksdb_status_t* statusptr); + extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf_with_status( rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, rocksdb_column_family_handle_t* column_family, rocksdb_status_t* statusptr);