|
|
@ -13,8 +13,11 @@ use ng_repo::types::*; |
|
|
|
use ng_repo::utils::*; |
|
|
|
use ng_repo::utils::*; |
|
|
|
|
|
|
|
|
|
|
|
use ng_repo::log::*; |
|
|
|
use ng_repo::log::*; |
|
|
|
|
|
|
|
use rocksdb::BlockBasedOptions; |
|
|
|
|
|
|
|
use rocksdb::DBCompressionType; |
|
|
|
use std::path::Path; |
|
|
|
use std::path::Path; |
|
|
|
use std::sync::{Arc, RwLock}; |
|
|
|
use std::sync::{Arc, RwLock}; |
|
|
|
|
|
|
|
use std::thread::available_parallelism; |
|
|
|
|
|
|
|
|
|
|
|
use serde::{Deserialize, Serialize}; |
|
|
|
use serde::{Deserialize, Serialize}; |
|
|
|
use serde_bare::error::Error; |
|
|
|
use serde_bare::error::Error; |
|
|
@ -36,14 +39,51 @@ impl RocksDbBlockStorage { |
|
|
|
/// The key is the encryption key for the data at rest.
|
|
|
|
/// The key is the encryption key for the data at rest.
|
|
|
|
pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result<RocksDbBlockStorage, StorageError> { |
|
|
|
pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result<RocksDbBlockStorage, StorageError> { |
|
|
|
let mut opts = Options::default(); |
|
|
|
let mut opts = Options::default(); |
|
|
|
|
|
|
|
let default_parallelism_approx = available_parallelism() |
|
|
|
|
|
|
|
.unwrap_or(std::num::NonZeroUsize::new(1).unwrap()) |
|
|
|
|
|
|
|
.get(); |
|
|
|
//opts.set_use_fsync(true);
|
|
|
|
//opts.set_use_fsync(true);
|
|
|
|
|
|
|
|
opts.set_max_background_jobs(default_parallelism_approx as i32); |
|
|
|
|
|
|
|
opts.increase_parallelism(default_parallelism_approx as i32); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// the default WAL size is CF_nbr * write_buffer_size * max_write_buffer_number * 4
|
|
|
|
|
|
|
|
opts.set_max_total_wal_size(256 * 1024 * 1024); |
|
|
|
|
|
|
|
opts.set_write_buffer_size(64 * 1024 * 1024); // which is the default. might have to reduce this on smartphones.
|
|
|
|
|
|
|
|
opts.set_target_file_size_base(1024 * 1024); |
|
|
|
|
|
|
|
opts.set_max_write_buffer_number(2); // the default
|
|
|
|
|
|
|
|
opts.set_level_zero_file_num_compaction_trigger(4); // the default
|
|
|
|
|
|
|
|
opts.set_max_bytes_for_level_base(16 * 1024 * 1024); |
|
|
|
|
|
|
|
opts.set_target_file_size_multiplier(10); |
|
|
|
|
|
|
|
opts.set_level_compaction_dynamic_level_bytes(true); |
|
|
|
|
|
|
|
|
|
|
|
opts.create_if_missing(true); |
|
|
|
opts.create_if_missing(true); |
|
|
|
opts.create_missing_column_families(true); |
|
|
|
opts.create_missing_column_families(false); |
|
|
|
|
|
|
|
opts.set_enable_blob_files(true); |
|
|
|
|
|
|
|
// all values are going to BlobStore
|
|
|
|
|
|
|
|
opts.set_min_blob_size(0); |
|
|
|
|
|
|
|
// set a low value (16M) for file_size to reduce space amplification
|
|
|
|
|
|
|
|
opts.set_blob_file_size(16 * 1024 * 1024); |
|
|
|
|
|
|
|
// no need for compression, as the data is encrypted (it won't compress)
|
|
|
|
|
|
|
|
opts.set_blob_compression_type(DBCompressionType::None); |
|
|
|
|
|
|
|
opts.set_enable_blob_gc(true); |
|
|
|
|
|
|
|
// the oldest half of blob files will be selected for GC
|
|
|
|
|
|
|
|
opts.set_blob_gc_age_cutoff(0.5); |
|
|
|
|
|
|
|
// in those oldest blob files, if 50% of it (8MB) is garbage, a forced compact will occur.
|
|
|
|
|
|
|
|
// this way we are reducing the space amplification by small decrements of 8MB
|
|
|
|
|
|
|
|
opts.set_blob_gc_force_threshold(0.5); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let mut block_based_opts = BlockBasedOptions::default(); |
|
|
|
|
|
|
|
// we will have a cache of decrypted objects, so there is no point in caching also the encrypted blocks.
|
|
|
|
|
|
|
|
block_based_opts.disable_cache(); |
|
|
|
|
|
|
|
block_based_opts.set_block_size(16 * 1024); |
|
|
|
|
|
|
|
block_based_opts.set_bloom_filter(10.0, false); |
|
|
|
|
|
|
|
block_based_opts.set_format_version(6); |
|
|
|
|
|
|
|
opts.set_block_based_table_factory(&block_based_opts); |
|
|
|
|
|
|
|
|
|
|
|
let env = Env::enc_env(key).unwrap(); |
|
|
|
let env = Env::enc_env(key).unwrap(); |
|
|
|
opts.set_env(&env); |
|
|
|
opts.set_env(&env); |
|
|
|
let tx_options = TransactionDBOptions::new(); |
|
|
|
let tx_options = TransactionDBOptions::new(); |
|
|
|
let db: TransactionDB = |
|
|
|
let db: TransactionDB = TransactionDB::open(&opts, &tx_options, &path).unwrap(); |
|
|
|
TransactionDB::open_cf(&opts, &tx_options, &path, vec!["cf0", "cf1"]).unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log_info!( |
|
|
|
log_info!( |
|
|
|
"created blockstorage with Rocksdb Version: {}", |
|
|
|
"created blockstorage with Rocksdb Version: {}", |
|
|
|