diff --git a/ng-broker/src/broker_storage/wallet.rs b/ng-broker/src/broker_storage/wallet.rs index 229e386..2ecfb5a 100644 --- a/ng-broker/src/broker_storage/wallet.rs +++ b/ng-broker/src/broker_storage/wallet.rs @@ -109,9 +109,9 @@ impl<'a> Wallet<'a> { })?; Ok(result.unwrap()) } - pub fn get_or_create_peers_key(&self) -> Result { - self.get_or_create_single_key(Self::PREFIX, &Self::KEY_PEERS.to_vec()) - } + // pub fn get_or_create_peers_key(&self) -> Result { + // self.get_or_create_single_key(Self::PREFIX, &Self::KEY_PEERS.to_vec()) + // } pub fn get_or_create_blocks_key(&self) -> Result { self.get_or_create_single_key(Self::PREFIX, &Self::KEY_BLOCKS.to_vec()) } diff --git a/ng-broker/src/server_storage.rs b/ng-broker/src/server_storage.rs index 2035267..a15cc2a 100644 --- a/ng-broker/src/server_storage.rs +++ b/ng-broker/src/server_storage.rs @@ -31,7 +31,7 @@ use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; pub struct RocksDbServerStorage { wallet_storage: RocksDbKCVStorage, accounts_storage: RocksDbKCVStorage, - peers_storage: RocksDbKCVStorage, + //peers_storage: RocksDbKCVStorage, peers_last_seq_path: PathBuf, peers_last_seq: Mutex>, block_storage: RocksDbBlockStorage, @@ -93,13 +93,13 @@ impl RocksDbServerStorage { RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; // create/open the PEERS storage - log_debug!("opening peers DB"); - let peers_key = wallet.get_or_create_peers_key()?; - let mut peers_path = path.clone(); - peers_path.push("peers"); - std::fs::create_dir_all(peers_path.clone()).unwrap(); - //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way - let peers_storage = RocksDbKCVStorage::open(&peers_path, peers_key.slice().clone())?; + // log_debug!("opening peers DB"); + // let peers_key = wallet.get_or_create_peers_key()?; + // let mut peers_path = path.clone(); + // peers_path.push("peers"); + // std::fs::create_dir_all(peers_path.clone()).unwrap(); + // //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way + // let peers_storage = RocksDbKCVStorage::open(&peers_path, peers_key.slice().clone())?; // creates the path for peers_last_seq let mut peers_last_seq_path = path.clone(); @@ -125,7 +125,7 @@ impl RocksDbServerStorage { Ok(RocksDbServerStorage { wallet_storage, accounts_storage, - peers_storage, + //peers_storage, peers_last_seq_path, peers_last_seq: Mutex::new(HashMap::new()), block_storage, diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 90b71c0..b55f04a 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -485,18 +485,18 @@ pub type BranchWriteCapSecret = PrivKey; // IDENTITY, SITE, STORE, OVERLAY common types // -/// List of Identity types -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub enum Identity { - OrgSite(PubKey), - IndividualSite(PubKey), - OrgPublicStore(PubKey), - OrgProtectedStore(PubKey), - OrgPrivateStore(PubKey), - IndividualPublicStore(PubKey), - IndividualProtectedStore(PubKey), - IndividualPrivateStore(PubKey), -} +// /// List of Identity types +// #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +// pub enum Identity { +// OrgSite(PubKey), +// IndividualSite(PubKey), +// OrgPublicStore(PubKey), +// OrgProtectedStore(PubKey), +// OrgPrivateStore(PubKey), +// IndividualPublicStore(PubKey), +// IndividualProtectedStore(PubKey), +// IndividualPrivateStore(PubKey), +// } pub type OuterOverlayId = Digest; diff --git a/ng-storage-rocksdb/src/block_storage.rs b/ng-storage-rocksdb/src/block_storage.rs index cf8985f..7db84ea 100644 --- a/ng-storage-rocksdb/src/block_storage.rs +++ b/ng-storage-rocksdb/src/block_storage.rs @@ -13,8 +13,11 @@ use ng_repo::types::*; use ng_repo::utils::*; use ng_repo::log::*; +use rocksdb::BlockBasedOptions; +use rocksdb::DBCompressionType; use std::path::Path; use std::sync::{Arc, RwLock}; +use std::thread::available_parallelism; use serde::{Deserialize, Serialize}; use serde_bare::error::Error; @@ -36,14 +39,51 @@ impl RocksDbBlockStorage { /// The key is the encryption key for the data at rest. pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result { let mut opts = Options::default(); + let default_parallelism_approx = available_parallelism() + .unwrap_or(std::num::NonZeroUsize::new(1).unwrap()) + .get(); //opts.set_use_fsync(true); + opts.set_max_background_jobs(default_parallelism_approx as i32); + opts.increase_parallelism(default_parallelism_approx as i32); + + // the default WAL size is CF_nbr * write_buffer_size * max_write_buffer_number * 4 + opts.set_max_total_wal_size(256 * 1024 * 1024); + opts.set_write_buffer_size(64 * 1024 * 1024); // which is the default. might have to reduce this on smartphones. + opts.set_target_file_size_base(1024 * 1024); + opts.set_max_write_buffer_number(2); // the default + opts.set_level_zero_file_num_compaction_trigger(4); // the default + opts.set_max_bytes_for_level_base(16 * 1024 * 1024); + opts.set_target_file_size_multiplier(10); + opts.set_level_compaction_dynamic_level_bytes(true); + opts.create_if_missing(true); - opts.create_missing_column_families(true); + opts.create_missing_column_families(false); + opts.set_enable_blob_files(true); + // all values are going to BlobStore + opts.set_min_blob_size(0); + // set a low value (16M) for file_size to reduce space amplification + opts.set_blob_file_size(16 * 1024 * 1024); + // no need for compression, as the data is encrypted (it won't compress) + opts.set_blob_compression_type(DBCompressionType::None); + opts.set_enable_blob_gc(true); + // the oldest half of blob files will be selected for GC + opts.set_blob_gc_age_cutoff(0.5); + // in those oldest blob files, if 50% of it (8MB) is garbage, a forced compact will occur. + // this way we are reducing the space amplification by small decrements of 8MB + opts.set_blob_gc_force_threshold(0.5); + + let mut block_based_opts = BlockBasedOptions::default(); + // we will have a cache of decrypted objects, so there is no point in caching also the encrypted blocks. + block_based_opts.disable_cache(); + block_based_opts.set_block_size(16 * 1024); + block_based_opts.set_bloom_filter(10.0, false); + block_based_opts.set_format_version(6); + opts.set_block_based_table_factory(&block_based_opts); + let env = Env::enc_env(key).unwrap(); opts.set_env(&env); let tx_options = TransactionDBOptions::new(); - let db: TransactionDB = - TransactionDB::open_cf(&opts, &tx_options, &path, vec!["cf0", "cf1"]).unwrap(); + let db: TransactionDB = TransactionDB::open(&opts, &tx_options, &path).unwrap(); log_info!( "created blockstorage with Rocksdb Version: {}", diff --git a/ng-storage-rocksdb/src/kcv_storage.rs b/ng-storage-rocksdb/src/kcv_storage.rs index db4bae3..5570667 100644 --- a/ng-storage-rocksdb/src/kcv_storage.rs +++ b/ng-storage-rocksdb/src/kcv_storage.rs @@ -11,11 +11,14 @@ use ng_repo::kcv_storage::*; use ng_repo::errors::*; use ng_repo::log::*; +use rocksdb::BlockBasedOptions; +use rocksdb::Cache; use rocksdb::DBIteratorWithThreadMode; use std::collections::HashMap; use std::path::Path; use std::path::PathBuf; +use std::thread::available_parallelism; use rocksdb::{ ColumnFamily, ColumnFamilyDescriptor, Direction, Env, ErrorKind, IteratorMode, Options, @@ -654,14 +657,46 @@ impl RocksDbKCVStorage { /// The key is the encryption key for the data at rest. pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result { let mut opts = Options::default(); + let default_parallelism_approx = available_parallelism() + .unwrap_or(std::num::NonZeroUsize::new(1).unwrap()) + .get(); //opts.set_use_fsync(true); + opts.set_max_background_jobs(default_parallelism_approx as i32); + opts.increase_parallelism(default_parallelism_approx as i32); + + // the default WAL size is CF_nbr * write_buffer_size * max_write_buffer_number * 4 + // we limit it to 1GB + opts.set_max_total_wal_size(1024 * 1024 * 1024); + opts.set_write_buffer_size(64 * 1024 * 1024); // which is the default. might have to reduce this on smartphones. + let nbr_of_cf = 1; + opts.set_db_write_buffer_size(64 * 1024 * 1024 * nbr_of_cf); + opts.set_target_file_size_base(64 * 1024 * 1024); // the default + opts.set_max_write_buffer_number(2); // the default + opts.set_level_zero_file_num_compaction_trigger(4); // the default + opts.set_max_bytes_for_level_base(256 * 1024 * 1024); + opts.set_target_file_size_multiplier(10); + opts.set_level_compaction_dynamic_level_bytes(true); + opts.create_if_missing(true); opts.create_missing_column_families(true); + + let mut block_based_opts = BlockBasedOptions::default(); + // we will have a cache of decrypted objects, so there is no point in caching also the encrypted blocks. + let cache = Cache::new_lru_cache(64 * 1024 * 1024); + block_based_opts.set_block_cache(&cache); + block_based_opts.set_cache_index_and_filter_blocks(true); + block_based_opts.set_block_size(16 * 1024); + block_based_opts.set_bloom_filter(10.0, false); + block_based_opts.set_format_version(6); + opts.set_block_based_table_factory(&block_based_opts); + let env = Env::enc_env(key).unwrap(); opts.set_env(&env); + + // TODO: use open_cf and choose which column family to create/ versus using set_prefix_extractor and doing prefix seek + let tx_options = TransactionDBOptions::new(); - let db: TransactionDB = - TransactionDB::open_cf(&opts, &tx_options, &path, vec!["cf0", "cf1"]).unwrap(); + let db: TransactionDB = TransactionDB::open(&opts, &tx_options, &path).unwrap(); log_info!( "created kcv storage with Rocksdb Version: {}",