Allows to have secondary instances in memory

pull/417/head
Tpt 2 years ago committed by Thomas Tanon
parent 5f68cb3746
commit 26f4e2dc98
  1. 67
      lib/src/storage/backend/rocksdb.rs
  2. 15
      lib/src/storage/mod.rs
  3. 32
      lib/src/store.rs
  4. 9
      lib/tests/store.rs
  5. 14
      server/src/main.rs

@ -101,8 +101,8 @@ struct RwDbHandler {
column_family_names: Vec<&'static str>, column_family_names: Vec<&'static str>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>, cf_options: Vec<*mut rocksdb_options_t>,
path: PathBuf,
in_memory: bool, in_memory: bool,
path: PathBuf,
} }
unsafe impl Send for RwDbHandler {} unsafe impl Send for RwDbHandler {}
@ -130,8 +130,9 @@ impl Drop for RwDbHandler {
rocksdb_options_destroy(self.options); rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options); rocksdb_block_based_options_destroy(self.block_based_table_options);
} }
if self.in_memory && self.path.exists() { if self.in_memory {
remove_dir_all(&self.path).unwrap(); #[allow(clippy::let_underscore_must_use)]
let _ = remove_dir_all(&self.path);
} }
} }
} }
@ -144,6 +145,7 @@ struct RoDbHandler {
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>, cf_options: Vec<*mut rocksdb_options_t>,
is_secondary: bool, is_secondary: bool,
path_to_remove: Option<PathBuf>,
} }
unsafe impl Send for RoDbHandler {} unsafe impl Send for RoDbHandler {}
@ -163,34 +165,29 @@ impl Drop for RoDbHandler {
rocksdb_readoptions_destroy(self.read_options); rocksdb_readoptions_destroy(self.read_options);
rocksdb_options_destroy(self.options); rocksdb_options_destroy(self.options);
} }
if let Some(path) = &self.path_to_remove {
#[allow(clippy::let_underscore_must_use)]
let _ = remove_dir_all(path);
}
} }
} }
impl Db { impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> { pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> {
let path = if cfg!(target_os = "linux") { Self::open_read_write(None, column_families)
"/dev/shm/".into()
} else {
temp_dir()
}
.join(format!("oxigraph-rocksdb-{}", random::<u128>()));
Self::open_read_write(path, column_families, true)
} }
pub fn open( pub fn open_read_write(
path: &Path, path: Option<&Path>,
column_families: Vec<ColumnFamilyDefinition>, column_families: Vec<ColumnFamilyDefinition>,
) -> Result<Self, StorageError> { ) -> Result<Self, StorageError> {
Self::open_read_write(path.into(), column_families, false) let (path, in_memory) = if let Some(path) = path {
} (path.to_path_buf(), false)
} else {
fn open_read_write( (tmp_path(), true)
path: PathBuf, };
column_families: Vec<ColumnFamilyDefinition>,
in_memory: bool,
) -> Result<Self, StorageError> {
unsafe {
let c_path = path_to_cstring(&path)?; let c_path = path_to_cstring(&path)?;
unsafe {
let options = Self::db_options(true, in_memory)?; let options = Self::db_options(true, in_memory)?;
rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1); rocksdb_options_set_create_missing_column_families(options, 1);
@ -320,8 +317,8 @@ impl Db {
column_family_names, column_family_names,
cf_handles, cf_handles,
cf_options, cf_options,
path,
in_memory, in_memory,
path,
})), })),
}) })
} }
@ -329,12 +326,17 @@ impl Db {
pub fn open_secondary( pub fn open_secondary(
primary_path: &Path, primary_path: &Path,
secondary_path: &Path, secondary_path: Option<&Path>,
column_families: Vec<ColumnFamilyDefinition>, column_families: Vec<ColumnFamilyDefinition>,
) -> Result<Self, StorageError> { ) -> Result<Self, StorageError> {
unsafe {
let c_primary_path = path_to_cstring(primary_path)?; let c_primary_path = path_to_cstring(primary_path)?;
let c_secondary_path = path_to_cstring(secondary_path)?; let (secondary_path, in_memory) = if let Some(path) = secondary_path {
(path.to_path_buf(), false)
} else {
(tmp_path(), true)
};
let c_secondary_path = path_to_cstring(&secondary_path)?;
unsafe {
let options = Self::db_options(false, false)?; let options = Self::db_options(false, false)?;
let (column_family_names, c_column_family_names, cf_options) = let (column_family_names, c_column_family_names, cf_options) =
Self::column_families_names_and_options(column_families, options); Self::column_families_names_and_options(column_families, options);
@ -385,6 +387,11 @@ impl Db {
cf_handles, cf_handles,
cf_options, cf_options,
is_secondary: true, is_secondary: true,
path_to_remove: if in_memory {
Some(secondary_path)
} else {
None
},
})), })),
}) })
} }
@ -447,6 +454,7 @@ impl Db {
cf_handles, cf_handles,
cf_options, cf_options,
is_secondary: false, is_secondary: false,
path_to_remove: None,
})), })),
}) })
} }
@ -1428,3 +1436,12 @@ fn available_file_descriptors() -> io::Result<Option<u64>> {
fn available_file_descriptors() -> io::Result<Option<u64>> { fn available_file_descriptors() -> io::Result<Option<u64>> {
Ok(None) Ok(None)
} }
fn tmp_path() -> PathBuf {
if cfg!(target_os = "linux") {
"/dev/shm/".into()
} else {
temp_dir()
}
.join(format!("oxigraph-rocksdb-{}", random::<u128>()))
}

@ -87,17 +87,26 @@ impl Storage {
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
pub fn open(path: &Path) -> Result<Self, StorageError> { pub fn open(path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open(path, Self::column_families())?) Self::setup(Db::open_read_write(Some(path), Self::column_families())?)
} }
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
pub fn open_secondary( pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open_secondary(
primary_path,
None,
Self::column_families(),
)?)
}
#[cfg(not(target_family = "wasm"))]
pub fn open_persistent_secondary(
primary_path: &Path, primary_path: &Path,
secondary_path: &Path, secondary_path: &Path,
) -> Result<Self, StorageError> { ) -> Result<Self, StorageError> {
Self::setup(Db::open_secondary( Self::setup(Db::open_secondary(
primary_path, primary_path,
secondary_path, Some(secondary_path),
Self::column_families(), Self::column_families(),
)?) )?)
} }

@ -106,26 +106,48 @@ impl Store {
}) })
} }
/// Opens a read-only clone of a running [`Store`]. /// Opens a read-only clone of a running read-write [`Store`].
///
/// Changes done while this process is running will be replicated after a possible lag.
///
/// It should only be used if a primary instance opened with [`Store::open`] is running at the same time.
/// `primary_path` must be the path of the primary instance.
/// This secondary instance will use temporary storage for the secondary instance cache.
/// If you prefer persistent storage use [`Store::open_persistent_secondary`].
///
/// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
#[cfg(not(target_family = "wasm"))]
pub fn open_secondary(primary_path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open_secondary(primary_path.as_ref())?,
})
}
/// Opens a read-only clone of a running read-write [`Store`] with persistence of the secondary instance cache.
///
/// Changes done while this process is running will be replicated after a possible lag.
/// ///
/// It should only be used if a primary instance opened with [`Store::open`] is running at the same time. /// It should only be used if a primary instance opened with [`Store::open`] is running at the same time.
/// `primary_path` must be the path of the primary instance and `secondary_path` an other directory for the secondary instance cache. /// `primary_path` must be the path of the primary instance and `secondary_path` an other directory for the secondary instance cache.
/// ///
/// If you want to simple read-only [`Store`] use [`Store::open_read_only`]. /// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
pub fn open_secondary( pub fn open_persistent_secondary(
primary_path: impl AsRef<Path>, primary_path: impl AsRef<Path>,
secondary_path: impl AsRef<Path>, secondary_path: impl AsRef<Path>,
) -> Result<Self, StorageError> { ) -> Result<Self, StorageError> {
Ok(Self { Ok(Self {
storage: Storage::open_secondary(primary_path.as_ref(), secondary_path.as_ref())?, storage: Storage::open_persistent_secondary(
primary_path.as_ref(),
secondary_path.as_ref(),
)?,
}) })
} }
/// Opens a read-only [`Store`] from disk. /// Opens a read-only [`Store`] from disk.
/// ///
/// It should not be already opened in write mode. /// Opening as read-only while having an other process writing the database is undefined behavior.
/// If you want to do so, use [`Store::open_secondary`]. /// [`Store::open_secondary`] should be used in this case.
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, StorageError> { pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self { Ok(Self {

@ -312,14 +312,13 @@ fn test_backup() -> Result<(), Box<dyn Error>> {
GraphNameRef::DefaultGraph, GraphNameRef::DefaultGraph,
); );
let store_dir = TempDir::default(); let store_dir = TempDir::default();
let secondary_store_dir = TempDir::default();
let backup_from_rw_dir = TempDir::default(); let backup_from_rw_dir = TempDir::default();
let backup_from_ro_dir = TempDir::default(); let backup_from_ro_dir = TempDir::default();
let backup_from_secondary_dir = TempDir::default(); let backup_from_secondary_dir = TempDir::default();
let store = Store::open(&store_dir)?; let store = Store::open(&store_dir)?;
store.insert(quad)?; store.insert(quad)?;
let secondary_store = Store::open_secondary(&store_dir, secondary_store_dir)?; let secondary_store = Store::open_secondary(&store_dir)?;
store.flush()?; store.flush()?;
store.backup(&backup_from_rw_dir)?; store.backup(&backup_from_rw_dir)?;
@ -396,11 +395,10 @@ fn test_secondary() -> Result<(), Box<dyn Error>> {
GraphNameRef::DefaultGraph, GraphNameRef::DefaultGraph,
); );
let primary_dir = TempDir::default(); let primary_dir = TempDir::default();
let secondary_dir = TempDir::default();
// We open the store // We open the store
let primary = Store::open(&primary_dir)?; let primary = Store::open(&primary_dir)?;
let secondary = Store::open_secondary(&primary_dir, &secondary_dir)?; let secondary = Store::open_secondary(&primary_dir)?;
// We insert a quad // We insert a quad
primary.insert(quad)?; primary.insert(quad)?;
@ -434,12 +432,11 @@ fn test_secondary() -> Result<(), Box<dyn Error>> {
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
fn test_open_secondary_bad_dir() -> Result<(), Box<dyn Error>> { fn test_open_secondary_bad_dir() -> Result<(), Box<dyn Error>> {
let primary_dir = TempDir::default(); let primary_dir = TempDir::default();
let secondary_dir = TempDir::default();
create_dir(&primary_dir.0)?; create_dir(&primary_dir.0)?;
{ {
File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?; File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?;
} }
assert!(Store::open_secondary(&primary_dir, &secondary_dir).is_err()); assert!(Store::open_secondary(&primary_dir).is_err());
Ok(()) Ok(())
} }

@ -69,18 +69,20 @@ enum Command {
}, },
/// Start Oxigraph HTTP server in secondary mode. /// Start Oxigraph HTTP server in secondary mode.
/// ///
/// It allows to read the database while other processes are accessing it. /// It allows to read the database while an other process is writing it.
/// Changes done while this process is running will be replicated after a possible lag. /// Changes done while this process is running will be replicated after a possible lag.
/// ///
/// Beware: RocksDB secondary mode does not support snapshots and transactions. /// Beware: RocksDB secondary mode does not support snapshots and transactions.
/// Dirty reads might happen. /// Dirty reads might happen.
ServeSecondary { ServeSecondary {
/// Directory where the primary Oxigraph instance is writing to. /// Directory where the primary Oxigraph instance is writing to.
#[arg(long)] #[arg(long, alias = "location", short_alias = 'l')]
primary_location: PathBuf, primary_location: PathBuf,
/// Directory to which the current secondary instance might write to. /// Directory to which the current secondary instance might write to.
///
/// By default, temporary storage is used.
#[arg(long)] #[arg(long)]
secondary_location: PathBuf, secondary_location: Option<PathBuf>,
/// Host and port to listen to. /// Host and port to listen to.
#[arg(short, long, default_value = "localhost:7878")] #[arg(short, long, default_value = "localhost:7878")]
bind: String, bind: String,
@ -163,7 +165,11 @@ pub fn main() -> anyhow::Result<()> {
secondary_location, secondary_location,
bind, bind,
} => serve( } => serve(
Store::open_secondary(primary_location, secondary_location)?, if let Some(secondary_location) = secondary_location {
Store::open_persistent_secondary(primary_location, secondary_location)
} else {
Store::open_secondary(primary_location)
}?,
bind, bind,
false, false,
), ),

Loading…
Cancel
Save