diff --git a/lib/src/storage/fallback_backend.rs b/lib/src/storage/fallback_backend.rs index 626dc001..7b4ae2bc 100644 --- a/lib/src/storage/fallback_backend.rs +++ b/lib/src/storage/fallback_backend.rs @@ -51,7 +51,13 @@ impl Db { .contains_key(key.as_ref())) } - pub fn insert(&self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { + pub fn insert( + &self, + column_family: &ColumnFamily, + key: &[u8], + value: &[u8], + _low_priority: bool, + ) -> Result<()> { self.0 .write() .unwrap() @@ -61,11 +67,21 @@ impl Db { Ok(()) } - pub fn insert_empty(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> { - self.insert(column_family, key, &[]) + pub fn insert_empty( + &self, + column_family: &ColumnFamily, + key: &[u8], + low_priority: bool, + ) -> Result<()> { + self.insert(column_family, key, &[], low_priority) } - pub fn remove(&self, column_family: &ColumnFamily, key: &[u8]) -> Result { + pub fn remove( + &self, + column_family: &ColumnFamily, + key: &[u8], + _low_priority: bool, + ) -> Result { Ok(self .0 .write() diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index ff8af110..60fad3fc 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -93,7 +93,7 @@ impl Storage { let quad = quad?; if !quad.graph_name.is_default_graph() { this.db - .insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?; + .insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?; } } version = 1; @@ -107,7 +107,7 @@ impl Storage { let mut new_value = Vec::with_capacity(value.len() + 4); new_value.extend_from_slice(&u32::MAX.to_be_bytes()); new_value.extend_from_slice(value); - this.db.insert(&this.id2str_cf, key, &new_value)?; + this.db.insert(&this.id2str_cf, key, &new_value, false)?; iter.next(); } iter.status()?; @@ -143,8 +143,12 @@ impl Storage { } fn set_version(&self, version: u64) -> std::io::Result<()> { - self.db - .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; + self.db.insert( + &self.default_cf, + b"oxversion", + &version.to_be_bytes(), + false, + )?; Ok(()) } @@ -471,15 +475,18 @@ impl Storage { } else { self.insert_quad_triple(quad, &encoded)?; - self.db.insert_empty(&self.dspo_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.dspo_cf, buffer.as_slice(), false)?; buffer.clear(); write_pos_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.dpos_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.dpos_cf, buffer.as_slice(), false)?; buffer.clear(); write_osp_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.dosp_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.dosp_cf, buffer.as_slice(), false)?; buffer.clear(); true @@ -491,32 +498,38 @@ impl Storage { } else { self.insert_quad_triple(quad, &encoded)?; - self.db.insert_empty(&self.spog_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.spog_cf, buffer.as_slice(), false)?; buffer.clear(); write_posg_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.posg_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.posg_cf, buffer.as_slice(), false)?; buffer.clear(); write_ospg_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.ospg_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.ospg_cf, buffer.as_slice(), false)?; buffer.clear(); write_gspo_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.gspo_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.gspo_cf, buffer.as_slice(), false)?; buffer.clear(); write_gpos_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.gpos_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.gpos_cf, buffer.as_slice(), false)?; buffer.clear(); write_gosp_quad(&mut buffer, &encoded); - self.db.insert_empty(&self.gosp_cf, buffer.as_slice())?; + self.db + .insert_empty(&self.gosp_cf, buffer.as_slice(), false)?; buffer.clear(); write_term(&mut buffer, &encoded.graph_name); if !self.db.contains_key(&self.graphs_cf, &buffer)? { - self.db.insert_empty(&self.graphs_cf, &buffer)?; + self.db.insert_empty(&self.graphs_cf, &buffer, false)?; self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; } buffer.clear(); @@ -537,15 +550,15 @@ impl Storage { write_spo_quad(&mut buffer, quad); if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? { - self.db.remove(&self.dspo_cf, buffer.as_slice())?; + self.db.remove(&self.dspo_cf, buffer.as_slice(), false)?; buffer.clear(); write_pos_quad(&mut buffer, quad); - self.db.remove(&self.dpos_cf, buffer.as_slice())?; + self.db.remove(&self.dpos_cf, buffer.as_slice(), false)?; buffer.clear(); write_osp_quad(&mut buffer, quad); - self.db.remove(&self.dosp_cf, buffer.as_slice())?; + self.db.remove(&self.dosp_cf, buffer.as_slice(), false)?; buffer.clear(); self.remove_quad_triple(quad)?; @@ -558,27 +571,27 @@ impl Storage { write_spog_quad(&mut buffer, quad); if self.db.contains_key(&self.spog_cf, buffer.as_slice())? { - self.db.remove(&self.spog_cf, buffer.as_slice())?; + self.db.remove(&self.spog_cf, buffer.as_slice(), false)?; buffer.clear(); write_posg_quad(&mut buffer, quad); - self.db.remove(&self.posg_cf, buffer.as_slice())?; + self.db.remove(&self.posg_cf, buffer.as_slice(), false)?; buffer.clear(); write_ospg_quad(&mut buffer, quad); - self.db.remove(&self.ospg_cf, buffer.as_slice())?; + self.db.remove(&self.ospg_cf, buffer.as_slice(), false)?; buffer.clear(); write_gspo_quad(&mut buffer, quad); - self.db.remove(&self.gspo_cf, buffer.as_slice())?; + self.db.remove(&self.gspo_cf, buffer.as_slice(), false)?; buffer.clear(); write_gpos_quad(&mut buffer, quad); - self.db.remove(&self.gpos_cf, buffer.as_slice())?; + self.db.remove(&self.gpos_cf, buffer.as_slice(), false)?; buffer.clear(); write_gosp_quad(&mut buffer, quad); - self.db.remove(&self.gosp_cf, buffer.as_slice())?; + self.db.remove(&self.gosp_cf, buffer.as_slice(), false)?; buffer.clear(); self.remove_quad_triple(quad)?; @@ -596,7 +609,7 @@ impl Storage { Ok(if self.db.contains_key(&self.graphs_cf, &encoded)? { false } else { - self.db.insert_empty(&self.graphs_cf, &encoded)?; + self.db.insert_empty(&self.graphs_cf, &encoded, false)?; self.insert_term(graph_name.into(), &encoded_graph_name)?; true }) @@ -633,7 +646,7 @@ impl Storage { } let encoded_graph = encode_term(graph_name); Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? { - self.db.remove(&self.graphs_cf, &encoded_graph)?; + self.db.remove(&self.graphs_cf, &encoded_graph, false)?; self.remove_term(graph_name)?; true } else { @@ -756,13 +769,13 @@ impl TermEncoder for Storage { let new_number = number.saturating_add(1); value[..4].copy_from_slice(&new_number.to_be_bytes()); self.db - .insert(&self.id2str_cf, &key.to_be_bytes(), &value)? + .insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)? } else { let mut buffer = Vec::with_capacity(value.len() + 4); buffer.extend_from_slice(&1_u32.to_be_bytes()); buffer.extend_from_slice(value.as_bytes()); self.db - .insert(&self.id2str_cf, &key.to_be_bytes(), &buffer)?; + .insert(&self.id2str_cf, &key.to_be_bytes(), &buffer, false)?; } Ok(()) } @@ -772,12 +785,12 @@ impl TermEncoder for Storage { let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?); let new_number = number.saturating_sub(1); if new_number == 0 { - self.db.remove(&self.id2str_cf, &key.to_be_bytes())?; + self.db.remove(&self.id2str_cf, &key.to_be_bytes(), true)?; } else { let mut value = value.to_vec(); value[..4].copy_from_slice(&new_number.to_be_bytes()); self.db - .insert(&self.id2str_cf, &key.to_be_bytes(), &value)?; + .insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)?; } } Ok(()) diff --git a/lib/src/storage/rocksdb_backend.rs b/lib/src/storage/rocksdb_backend.rs index 11f447bb..5b170992 100644 --- a/lib/src/storage/rocksdb_backend.rs +++ b/lib/src/storage/rocksdb_backend.rs @@ -49,6 +49,7 @@ struct DbHandler { txn_options: *mut rocksdb_transactiondb_options_t, read_options: *mut rocksdb_readoptions_t, write_options: *mut rocksdb_writeoptions_t, + low_priority_write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, env: Option<*mut rocksdb_env_t>, column_families: Vec<&'static str>, @@ -64,6 +65,7 @@ impl Drop for DbHandler { rocksdb_transactiondb_close(self.db); rocksdb_readoptions_destroy(self.read_options); rocksdb_writeoptions_destroy(self.write_options); + rocksdb_writeoptions_destroy(self.low_priority_write_options); rocksdb_flushoptions_destroy(self.flush_options); rocksdb_transactiondb_options_destroy(self.txn_options); rocksdb_options_destroy(self.options); @@ -185,17 +187,26 @@ impl Db { !read_options.is_null(), "rocksdb_readoptions_create returned null" ); + let write_options = rocksdb_writeoptions_create(); assert!( - !read_options.is_null(), + !write_options.is_null(), "rocksdb_writeoptions_create returned null" ); if in_memory { rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL } + + let low_priority_write_options = rocksdb_writeoptions_create_copy(write_options); + assert!( + !low_priority_write_options.is_null(), + "rocksdb_writeoptions_create_copy returned null" + ); + rocksdb_writeoptions_set_low_pri(low_priority_write_options, 1); + let flush_options = rocksdb_flushoptions_create(); assert!( - !options.is_null(), + !flush_options.is_null(), "rocksdb_flushoptions_create returned null" ); @@ -205,6 +216,7 @@ impl Db { txn_options, read_options, write_options, + low_priority_write_options, flush_options, env, column_families, @@ -254,11 +266,21 @@ impl Db { Ok(self.get(column_family, key)?.is_some()) //TODO: optimize } - pub fn insert(&self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { + pub fn insert( + &self, + column_family: &ColumnFamily, + key: &[u8], + value: &[u8], + low_priority: bool, + ) -> Result<()> { unsafe { ffi_result!(rocksdb_transactiondb_put_cf( self.0.db, - self.0.write_options, + if low_priority { + self.0.low_priority_write_options + } else { + self.0.write_options + }, column_family.0, key.as_ptr() as *const c_char, key.len(), @@ -268,15 +290,29 @@ impl Db { } } - pub fn insert_empty(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> { - self.insert(column_family, key, &[]) + pub fn insert_empty( + &self, + column_family: &ColumnFamily, + key: &[u8], + low_priority: bool, + ) -> Result<()> { + self.insert(column_family, key, &[], low_priority) } - pub fn remove(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> { + pub fn remove( + &self, + column_family: &ColumnFamily, + key: &[u8], + low_priority: bool, + ) -> Result<()> { unsafe { ffi_result!(rocksdb_transactiondb_delete_cf( self.0.db, - self.0.write_options, + if low_priority { + self.0.low_priority_write_options + } else { + self.0.write_options + }, column_family.0, key.as_ptr() as *const c_char, key.len() diff --git a/rocksdb-sys/api/c.cc b/rocksdb-sys/api/c.cc index 0c51fce3..87da6324 100644 --- a/rocksdb-sys/api/c.cc +++ b/rocksdb-sys/api/c.cc @@ -27,4 +27,8 @@ void rocksdb_transactiondb_flush( SaveError(errptr, db->rep->Flush(options->rep)); } +rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) { + return new rocksdb_writeoptions_t(*options); +} + } diff --git a/rocksdb-sys/api/c.h b/rocksdb-sys/api/c.h index b97b760a..916ccb48 100644 --- a/rocksdb-sys/api/c.h +++ b/rocksdb-sys/api/c.h @@ -15,6 +15,9 @@ extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pi extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush( rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr); +extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy( + rocksdb_writeoptions_t*); + #ifdef __cplusplus } #endif