Adds a ColumnFamilyDefinition struct to store column family options

pull/171/head
Tpt 3 years ago
parent def7a3ce72
commit bf0f178c41
  1. 8
      lib/src/storage/fallback_backend.rs
  2. 29
      lib/src/storage/mod.rs
  3. 33
      lib/src/storage/rocksdb_backend.rs
  4. 1
      lib/src/store.rs

@ -4,14 +4,18 @@ use std::collections::BTreeMap;
use std::io::Result; use std::io::Result;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
pub struct ColumnFamilyDefinition {
pub name: &'static str,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<RwLock<BTreeMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>); pub struct Db(Arc<RwLock<BTreeMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>);
impl Db { impl Db {
pub fn new(column_families: &'static [&'static str]) -> Result<Self> { pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
let mut trees = BTreeMap::new(); let mut trees = BTreeMap::new();
for cf in column_families { for cf in column_families {
trees.insert(ColumnFamily(*cf), BTreeMap::default()); trees.insert(ColumnFamily(cf.name), BTreeMap::default());
} }
trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists.
Ok(Self(Arc::new(RwLock::new(trees)))) Ok(Self(Arc::new(RwLock::new(trees))))

@ -8,9 +8,9 @@ use crate::storage::binary_encoder::{
}; };
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder}; use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder};
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use fallback_backend::{ColumnFamily, Db, Iter}; use fallback_backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use rocksdb_backend::{ColumnFamily, Db, Iter}; use rocksdb_backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::path::Path; use std::path::Path;
@ -36,11 +36,6 @@ const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs"; const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default"; const DEFAULT_CF: &str = "default";
const COLUMN_FAMILIES: [&str; 11] = [
ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF, DSPO_CF, DPOS_CF, DOSP_CF,
GRAPHS_CF,
];
/// Low level storage primitives /// Low level storage primitives
#[derive(Clone)] #[derive(Clone)]
pub struct Storage { pub struct Storage {
@ -61,12 +56,28 @@ pub struct Storage {
impl Storage { impl Storage {
pub fn new() -> std::io::Result<Self> { pub fn new() -> std::io::Result<Self> {
Self::setup(Db::new(&COLUMN_FAMILIES)?) Self::setup(Db::new(Self::column_families())?)
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &Path) -> std::io::Result<Self> { pub fn open(path: &Path) -> std::io::Result<Self> {
Self::setup(Db::open(path, &COLUMN_FAMILIES)?) Self::setup(Db::open(path, Self::column_families())?)
}
fn column_families() -> Vec<ColumnFamilyDefinition> {
vec![
ColumnFamilyDefinition { name: ID2STR_CF },
ColumnFamilyDefinition { name: SPOG_CF },
ColumnFamilyDefinition { name: POSG_CF },
ColumnFamilyDefinition { name: OSPG_CF },
ColumnFamilyDefinition { name: GSPO_CF },
ColumnFamilyDefinition { name: GPOS_CF },
ColumnFamilyDefinition { name: GOSP_CF },
ColumnFamilyDefinition { name: DSPO_CF },
ColumnFamilyDefinition { name: DPOS_CF },
ColumnFamilyDefinition { name: DOSP_CF },
ColumnFamilyDefinition { name: GRAPHS_CF },
]
} }
fn setup(db: Db) -> std::io::Result<Self> { fn setup(db: Db) -> std::io::Result<Self> {

@ -37,6 +37,10 @@ macro_rules! ffi_result_impl {
}} }}
} }
pub struct ColumnFamilyDefinition {
pub name: &'static str,
}
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<DbHandler>); pub struct Db(Arc<DbHandler>);
@ -52,7 +56,7 @@ struct DbHandler {
low_priority_write_options: *mut rocksdb_writeoptions_t, low_priority_write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t, flush_options: *mut rocksdb_flushoptions_t,
env: Option<*mut rocksdb_env_t>, env: Option<*mut rocksdb_env_t>,
column_families: Vec<&'static str>, column_families: Vec<ColumnFamilyDefinition>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
} }
@ -77,7 +81,7 @@ impl Drop for DbHandler {
} }
impl Db { impl Db {
pub fn new(column_families: &'static [&'static str]) -> Result<Self> { pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
let path = if cfg!(target_os = "linux") { let path = if cfg!(target_os = "linux") {
"/dev/shm/".into() "/dev/shm/".into()
} else { } else {
@ -87,13 +91,13 @@ impl Db {
Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?))) Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?)))
} }
pub fn open(path: &Path, column_families: &'static [&'static str]) -> Result<Self> { pub fn open(path: &Path, column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) Ok(Self(Arc::new(Self::do_open(path, column_families, false)?)))
} }
fn do_open( fn do_open(
path: &Path, path: &Path,
column_families: &'static [&'static str], mut column_families: Vec<ColumnFamilyDefinition>,
in_memory: bool, in_memory: bool,
) -> Result<DbHandler> { ) -> Result<DbHandler> {
let c_path = CString::new( let c_path = CString::new(
@ -107,12 +111,16 @@ impl Db {
assert!(!options.is_null(), "rocksdb_options_create returned null"); assert!(!options.is_null(), "rocksdb_options_create returned null");
rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1); rocksdb_options_set_create_missing_column_families(options, 1);
if !in_memory {
rocksdb_options_set_compression( rocksdb_options_set_compression(
options, options,
rocksdb_lz4_compression.try_into().unwrap(), if in_memory {
); rocksdb_no_compression
} else {
rocksdb_lz4_compression
} }
.try_into()
.unwrap(),
);
let txn_options = rocksdb_transactiondb_options_create(); let txn_options = rocksdb_transactiondb_options_create();
assert!( assert!(
@ -133,13 +141,12 @@ impl Db {
None None
}; };
let mut column_families = column_families.to_vec(); if !column_families.iter().any(|c| c.name == "default") {
if !column_families.contains(&"default") { column_families.push(ColumnFamilyDefinition { name: "default" })
column_families.push("default")
} }
let c_column_families = column_families let c_column_families = column_families
.iter() .iter()
.map(|cf| CString::new(*cf)) .map(|cf| CString::new(cf.name))
.collect::<std::result::Result<Vec<_>, _>>() .collect::<std::result::Result<Vec<_>, _>>()
.map_err(invalid_input_error)?; .map_err(invalid_input_error)?;
let cf_options: Vec<*const rocksdb_options_t> = vec![options; column_families.len()]; let cf_options: Vec<*const rocksdb_options_t> = vec![options; column_families.len()];
@ -226,8 +233,8 @@ impl Db {
} }
pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> { pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> {
for (cf_name, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) { for (cf, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) {
if *cf_name == name { if cf.name == name {
return Some(ColumnFamily(*cf_handle)); return Some(ColumnFamily(*cf_handle));
} }
} }

@ -33,6 +33,7 @@ use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage}; use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage};
use std::io::{BufRead, Write}; use std::io::{BufRead, Write};
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path; use std::path::Path;
use std::{fmt, io, str}; use std::{fmt, io, str};

Loading…
Cancel
Save