diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index fc842a97..71d3f33d 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -138,11 +138,7 @@ impl Db { in_memory: bool, for_bulk_load: bool, ) -> Result { - let c_path = CString::new( - path.to_str() - .ok_or_else(|| invalid_input_error("The DB path is not valid UTF-8"))?, - ) - .map_err(invalid_input_error)?; + let c_path = path_to_cstring(path)?; unsafe { let options = rocksdb_options_create(); @@ -500,31 +496,23 @@ impl Db { pub fn new_sst_file(&self) -> Result { unsafe { + let path = self.0.path.join(random::().to_string()); let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options); - let cpath = CString::new( - self.0 - .path - .join(random::().to_string()) - .to_string_lossy() - .to_string(), - ) - .map_err(invalid_input_error)?; - ffi_result!(rocksdb_sstfilewriter_open(writer, cpath.as_ptr()))?; - Ok(SstFileWriter { writer, cpath }) + ffi_result!(rocksdb_sstfilewriter_open( + writer, + path_to_cstring(&path)?.as_ptr() + ))?; + Ok(SstFileWriter { writer, path }) } } - pub fn write_stt_files( - &self, - writers_with_cf: Vec<(&ColumnFamily, SstFileWriter)>, - ) -> Result<()> { - for (cf, writer) in writers_with_cf { + pub fn write_stt_files(&self, ssts_for_cf: Vec<(&ColumnFamily, PathBuf)>) -> Result<()> { + for (cf, path) in ssts_for_cf { unsafe { - ffi_result!(rocksdb_sstfilewriter_finish(writer.writer))?; ffi_result!(rocksdb_transactiondb_ingest_external_file_cf( self.0.db, cf.0, - &writer.cpath.as_ptr(), + &path_to_cstring(&path)?.as_ptr(), 1, self.0.ingest_external_file_options ))?; @@ -764,7 +752,7 @@ impl Iter { pub struct SstFileWriter { writer: *mut rocksdb_sstfilewriter_t, - cpath: CString, + path: PathBuf, } impl Drop for SstFileWriter { @@ -803,6 +791,13 @@ impl SstFileWriter { )) } } + + pub fn finish(self) -> Result { + unsafe { + ffi_result!(rocksdb_sstfilewriter_finish(self.writer))?; + } + Ok(self.path.clone()) + } } fn convert_error(ptr: *const c_char) -> Error { @@ -950,3 +945,11 @@ unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char let filter = &*(filter as *const CompactionFilter); filter.name.as_ptr() } + +fn path_to_cstring(path: &Path) -> Result { + CString::new( + path.to_str() + .ok_or_else(|| invalid_input_error("The DB path is not valid UTF-8"))?, + ) + .map_err(invalid_input_error) +} diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 133ec17e..3b31321f 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -9,8 +9,6 @@ use crate::storage::binary_encoder::{ use crate::storage::numeric_encoder::{ insert_term, remove_term, EncodedQuad, EncodedTerm, StrHash, StrLookup, }; -#[cfg(not(target_arch = "wasm32"))] -use backend::SstFileWriter; use backend::{ ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, MergeOperator, WriteBatchWithIndex, @@ -23,6 +21,7 @@ use std::io::Result; use std::mem::take; #[cfg(not(target_arch = "wasm32"))] use std::path::Path; +use std::path::PathBuf; mod backend; mod binary_encoder; @@ -1114,13 +1113,13 @@ impl BulkLoader { id2str_sst.merge(&k, &buffer)?; buffer.clear(); } - to_load.push((&self.storage.id2str_cf, id2str_sst)); + to_load.push((&self.storage.id2str_cf, id2str_sst.finish()?)); } if !self.triples.is_empty() { to_load.push(( &self.storage.dspo_cf, - self.add_keys_to_sst( + self.build_sst_for_keys( self.triples.iter().map(|quad| { encode_term_triple(&quad.subject, &quad.predicate, &quad.object) }), @@ -1128,7 +1127,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.dpos_cf, - self.add_keys_to_sst( + self.build_sst_for_keys( self.triples.iter().map(|quad| { encode_term_triple(&quad.predicate, &quad.object, &quad.subject) }), @@ -1136,7 +1135,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.dosp_cf, - self.add_keys_to_sst( + self.build_sst_for_keys( self.triples.iter().map(|quad| { encode_term_triple(&quad.object, &quad.subject, &quad.predicate) }), @@ -1149,12 +1148,12 @@ impl BulkLoader { let quads = take(&mut self.graphs); to_load.push(( &self.storage.graphs_cf, - self.add_keys_to_sst(quads.into_iter().map(|g| encode_term(&g)))?, + self.build_sst_for_keys(quads.into_iter().map(|g| encode_term(&g)))?, )); to_load.push(( &self.storage.gspo_cf, - self.add_keys_to_sst(self.quads.iter().map(|quad| { + self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.graph_name, &quad.subject, @@ -1165,7 +1164,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.gpos_cf, - self.add_keys_to_sst(self.quads.iter().map(|quad| { + self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.graph_name, &quad.object, @@ -1176,7 +1175,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.gosp_cf, - self.add_keys_to_sst(self.quads.iter().map(|quad| { + self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.graph_name, &quad.object, @@ -1187,7 +1186,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.spog_cf, - self.add_keys_to_sst(self.quads.iter().map(|quad| { + self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.subject, &quad.predicate, @@ -1198,7 +1197,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.posg_cf, - self.add_keys_to_sst(self.quads.iter().map(|quad| { + self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.object, &quad.subject, @@ -1209,7 +1208,7 @@ impl BulkLoader { )); to_load.push(( &self.storage.ospg_cf, - self.add_keys_to_sst(self.quads.iter().map(|quad| { + self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.object, &quad.subject, @@ -1240,14 +1239,14 @@ impl BulkLoader { ) } - fn add_keys_to_sst(&self, values: impl Iterator>) -> Result { + fn build_sst_for_keys(&self, values: impl Iterator>) -> Result { let mut values = values.collect::>(); values.sort_unstable(); let mut sst = self.storage.db.new_sst_file()?; for t in values { sst.insert_empty(&t)?; } - Ok(sst) + sst.finish() } }