RocksDB: Adds low priority writes

Allows RocksDB to defer garbage collection if useful
pull/171/head
Tpt 3 years ago
parent f68d747308
commit def7a3ce72
  1. 24
      lib/src/storage/fallback_backend.rs
  2. 71
      lib/src/storage/mod.rs
  3. 52
      lib/src/storage/rocksdb_backend.rs
  4. 4
      rocksdb-sys/api/c.cc
  5. 3
      rocksdb-sys/api/c.h

@ -51,7 +51,13 @@ impl Db {
.contains_key(key.as_ref()))
}
pub fn insert(&self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
pub fn insert(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
_low_priority: bool,
) -> Result<()> {
self.0
.write()
.unwrap()
@ -61,11 +67,21 @@ impl Db {
Ok(())
}
pub fn insert_empty(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.insert(column_family, key, &[])
pub fn insert_empty(
&self,
column_family: &ColumnFamily,
key: &[u8],
low_priority: bool,
) -> Result<()> {
self.insert(column_family, key, &[], low_priority)
}
pub fn remove(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
pub fn remove(
&self,
column_family: &ColumnFamily,
key: &[u8],
_low_priority: bool,
) -> Result<bool> {
Ok(self
.0
.write()

@ -93,7 +93,7 @@ impl Storage {
let quad = quad?;
if !quad.graph_name.is_default_graph() {
this.db
.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?;
.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?;
}
}
version = 1;
@ -107,7 +107,7 @@ impl Storage {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&u32::MAX.to_be_bytes());
new_value.extend_from_slice(value);
this.db.insert(&this.id2str_cf, key, &new_value)?;
this.db.insert(&this.id2str_cf, key, &new_value, false)?;
iter.next();
}
iter.status()?;
@ -143,8 +143,12 @@ impl Storage {
}
fn set_version(&self, version: u64) -> std::io::Result<()> {
self.db
.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
self.db.insert(
&self.default_cf,
b"oxversion",
&version.to_be_bytes(),
false,
)?;
Ok(())
}
@ -471,15 +475,18 @@ impl Storage {
} else {
self.insert_quad_triple(quad, &encoded)?;
self.db.insert_empty(&self.dspo_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.dspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_pos_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.dpos_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.dpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_osp_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.dosp_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.dosp_cf, buffer.as_slice(), false)?;
buffer.clear();
true
@ -491,32 +498,38 @@ impl Storage {
} else {
self.insert_quad_triple(quad, &encoded)?;
self.db.insert_empty(&self.spog_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.spog_cf, buffer.as_slice(), false)?;
buffer.clear();
write_posg_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.posg_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.posg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_ospg_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.ospg_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.ospg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gspo_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.gspo_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.gspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gpos_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.gpos_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.gpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gosp_quad(&mut buffer, &encoded);
self.db.insert_empty(&self.gosp_cf, buffer.as_slice())?;
self.db
.insert_empty(&self.gosp_cf, buffer.as_slice(), false)?;
buffer.clear();
write_term(&mut buffer, &encoded.graph_name);
if !self.db.contains_key(&self.graphs_cf, &buffer)? {
self.db.insert_empty(&self.graphs_cf, &buffer)?;
self.db.insert_empty(&self.graphs_cf, &buffer, false)?;
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
@ -537,15 +550,15 @@ impl Storage {
write_spo_quad(&mut buffer, quad);
if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? {
self.db.remove(&self.dspo_cf, buffer.as_slice())?;
self.db.remove(&self.dspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_pos_quad(&mut buffer, quad);
self.db.remove(&self.dpos_cf, buffer.as_slice())?;
self.db.remove(&self.dpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_osp_quad(&mut buffer, quad);
self.db.remove(&self.dosp_cf, buffer.as_slice())?;
self.db.remove(&self.dosp_cf, buffer.as_slice(), false)?;
buffer.clear();
self.remove_quad_triple(quad)?;
@ -558,27 +571,27 @@ impl Storage {
write_spog_quad(&mut buffer, quad);
if self.db.contains_key(&self.spog_cf, buffer.as_slice())? {
self.db.remove(&self.spog_cf, buffer.as_slice())?;
self.db.remove(&self.spog_cf, buffer.as_slice(), false)?;
buffer.clear();
write_posg_quad(&mut buffer, quad);
self.db.remove(&self.posg_cf, buffer.as_slice())?;
self.db.remove(&self.posg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
self.db.remove(&self.ospg_cf, buffer.as_slice())?;
self.db.remove(&self.ospg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
self.db.remove(&self.gspo_cf, buffer.as_slice())?;
self.db.remove(&self.gspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
self.db.remove(&self.gpos_cf, buffer.as_slice())?;
self.db.remove(&self.gpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
self.db.remove(&self.gosp_cf, buffer.as_slice())?;
self.db.remove(&self.gosp_cf, buffer.as_slice(), false)?;
buffer.clear();
self.remove_quad_triple(quad)?;
@ -596,7 +609,7 @@ impl Storage {
Ok(if self.db.contains_key(&self.graphs_cf, &encoded)? {
false
} else {
self.db.insert_empty(&self.graphs_cf, &encoded)?;
self.db.insert_empty(&self.graphs_cf, &encoded, false)?;
self.insert_term(graph_name.into(), &encoded_graph_name)?;
true
})
@ -633,7 +646,7 @@ impl Storage {
}
let encoded_graph = encode_term(graph_name);
Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? {
self.db.remove(&self.graphs_cf, &encoded_graph)?;
self.db.remove(&self.graphs_cf, &encoded_graph, false)?;
self.remove_term(graph_name)?;
true
} else {
@ -756,13 +769,13 @@ impl TermEncoder for Storage {
let new_number = number.saturating_add(1);
value[..4].copy_from_slice(&new_number.to_be_bytes());
self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &value)?
.insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)?
} else {
let mut buffer = Vec::with_capacity(value.len() + 4);
buffer.extend_from_slice(&1_u32.to_be_bytes());
buffer.extend_from_slice(value.as_bytes());
self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &buffer)?;
.insert(&self.id2str_cf, &key.to_be_bytes(), &buffer, false)?;
}
Ok(())
}
@ -772,12 +785,12 @@ impl TermEncoder for Storage {
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?);
let new_number = number.saturating_sub(1);
if new_number == 0 {
self.db.remove(&self.id2str_cf, &key.to_be_bytes())?;
self.db.remove(&self.id2str_cf, &key.to_be_bytes(), true)?;
} else {
let mut value = value.to_vec();
value[..4].copy_from_slice(&new_number.to_be_bytes());
self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &value)?;
.insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)?;
}
}
Ok(())

@ -49,6 +49,7 @@ struct DbHandler {
txn_options: *mut rocksdb_transactiondb_options_t,
read_options: *mut rocksdb_readoptions_t,
write_options: *mut rocksdb_writeoptions_t,
low_priority_write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t,
env: Option<*mut rocksdb_env_t>,
column_families: Vec<&'static str>,
@ -64,6 +65,7 @@ impl Drop for DbHandler {
rocksdb_transactiondb_close(self.db);
rocksdb_readoptions_destroy(self.read_options);
rocksdb_writeoptions_destroy(self.write_options);
rocksdb_writeoptions_destroy(self.low_priority_write_options);
rocksdb_flushoptions_destroy(self.flush_options);
rocksdb_transactiondb_options_destroy(self.txn_options);
rocksdb_options_destroy(self.options);
@ -185,17 +187,26 @@ impl Db {
!read_options.is_null(),
"rocksdb_readoptions_create returned null"
);
let write_options = rocksdb_writeoptions_create();
assert!(
!read_options.is_null(),
!write_options.is_null(),
"rocksdb_writeoptions_create returned null"
);
if in_memory {
rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL
}
let low_priority_write_options = rocksdb_writeoptions_create_copy(write_options);
assert!(
!low_priority_write_options.is_null(),
"rocksdb_writeoptions_create_copy returned null"
);
rocksdb_writeoptions_set_low_pri(low_priority_write_options, 1);
let flush_options = rocksdb_flushoptions_create();
assert!(
!options.is_null(),
!flush_options.is_null(),
"rocksdb_flushoptions_create returned null"
);
@ -205,6 +216,7 @@ impl Db {
txn_options,
read_options,
write_options,
low_priority_write_options,
flush_options,
env,
column_families,
@ -254,11 +266,21 @@ impl Db {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn insert(&self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
pub fn insert(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
low_priority: bool,
) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transactiondb_put_cf(
self.0.db,
self.0.write_options,
if low_priority {
self.0.low_priority_write_options
} else {
self.0.write_options
},
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
@ -268,15 +290,29 @@ impl Db {
}
}
pub fn insert_empty(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.insert(column_family, key, &[])
pub fn insert_empty(
&self,
column_family: &ColumnFamily,
key: &[u8],
low_priority: bool,
) -> Result<()> {
self.insert(column_family, key, &[], low_priority)
}
pub fn remove(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
pub fn remove(
&self,
column_family: &ColumnFamily,
key: &[u8],
low_priority: bool,
) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transactiondb_delete_cf(
self.0.db,
self.0.write_options,
if low_priority {
self.0.low_priority_write_options
} else {
self.0.write_options
},
column_family.0,
key.as_ptr() as *const c_char,
key.len()

@ -27,4 +27,8 @@ void rocksdb_transactiondb_flush(
SaveError(errptr, db->rep->Flush(options->rep));
}
rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) {
return new rocksdb_writeoptions_t(*options);
}
}

@ -15,6 +15,9 @@ extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pi
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(
rocksdb_writeoptions_t*);
#ifdef __cplusplus
}
#endif

Loading…
Cancel
Save