/* * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers * All rights reserved. * Licensed under the Apache License, Version 2.0 * * or the MIT license , * at your option. All files in the project carrying such * notice may not be copied, modified, or distributed except * according to those terms. */ use std::collections::{HashMap, HashSet}; use std::fs::{read, File, OpenOptions}; use std::io::Write; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use ng_repo::block_storage::{BlockStorage, HashMapBlockStorage}; use ng_repo::errors::{ProtocolError, ServerError, StorageError}; use ng_repo::log::*; use ng_repo::object::Object; use ng_repo::store::Store; use ng_repo::types::*; use ng_net::types::*; use ng_storage_rocksdb::block_storage::RocksDbBlockStorage; use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; use crate::server_broker::*; use crate::server_storage::admin::{account::Account, invitation::Invitation, wallet::Wallet}; use crate::server_storage::core::*; pub(crate) struct RocksDbServerStorage { #[allow(dead_code)] wallet_storage: RocksDbKCVStorage, accounts_storage: RocksDbKCVStorage, //peers_storage: RocksDbKCVStorage, peers_last_seq_path: PathBuf, peers_last_seq: Mutex>, block_storage: Arc>, core_storage: RocksDbKCVStorage, } impl RocksDbServerStorage { pub(crate) fn open( path: &mut PathBuf, master_key: SymKey, admin_invite: Option, ) -> Result { // create/open the WALLET let mut wallet_path = path.clone(); wallet_path.push("wallet"); std::fs::create_dir_all(wallet_path.clone()).unwrap(); log_debug!("opening wallet DB"); //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way let wallet_storage = RocksDbKCVStorage::open(&wallet_path, master_key.slice().clone())?; let wallet = Wallet::open(&wallet_storage); // create/open the ACCOUNTS storage let mut accounts_path = path.clone(); let accounts_key; accounts_path.push("accounts"); if admin_invite.is_some() && !accounts_path.exists() && !wallet.exists_accounts_key() { accounts_key = wallet.create_accounts_key()?; std::fs::create_dir_all(accounts_path.clone()).unwrap(); let accounts_storage = RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; let symkey = SymKey::random(); let invite_code = InvitationCode::Setup(symkey.clone()); let _ = Invitation::create( &invite_code, 0, &Some("admin user automatically invited at first startup".to_string()), &accounts_storage, )?; let invitation = ng_net::types::Invitation::V0(InvitationV0 { code: Some(symkey), name: Some("your Broker, as admin".into()), url: None, bootstrap: admin_invite.unwrap(), }); for link in invitation.get_urls() { println!("The admin invitation link is: {}", link) } } else { if admin_invite.is_some() { log_warn!("Cannot add an admin invitation anymore, as it is not the first start of the server."); } accounts_key = wallet.get_or_create_accounts_key()?; } log_debug!("opening accounts DB"); std::fs::create_dir_all(accounts_path.clone()).unwrap(); //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way let accounts_storage = RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; // create/open the PEERS storage // log_debug!("opening peers DB"); // let peers_key = wallet.get_or_create_peers_key()?; // let mut peers_path = path.clone(); // peers_path.push("peers"); // std::fs::create_dir_all(peers_path.clone()).unwrap(); // //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way // let peers_storage = RocksDbKCVStorage::open(&peers_path, peers_key.slice().clone())?; // creates the path for peers_last_seq let mut peers_last_seq_path = path.clone(); peers_last_seq_path.push("peers_last_seq"); std::fs::create_dir_all(peers_last_seq_path.clone()).unwrap(); // opening block_storage let mut blocks_path = path.clone(); blocks_path.push("blocks"); std::fs::create_dir_all(blocks_path.clone()).unwrap(); let blocks_key = wallet.get_or_create_blocks_key()?; let block_storage = Arc::new(std::sync::RwLock::new(RocksDbBlockStorage::open( &blocks_path, *blocks_key.slice(), )?)); // create/open the PEERS storage log_debug!("opening core DB"); let core_key = wallet.get_or_create_core_key()?; let mut core_path = path.clone(); core_path.push("core"); std::fs::create_dir_all(core_path.clone()).unwrap(); //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way #[cfg(debug_assertions)] let mut core_storage = RocksDbKCVStorage::open(&core_path, core_key.slice().clone())?; #[cfg(not(debug_assertions))] let core_storage = RocksDbKCVStorage::open(&core_path, core_key.slice().clone())?; // check unicity of class prefixes, by storage #[cfg(debug_assertions)] { // TODO: refactor the wallet and accounts with Class and the new OKM mechanism, then include them uncomment the following lines //log_debug!("CHECKING..."); // wallet_storage.add_class(&Wallet::CLASS); // wallet_storage.check_prefixes(); // accounts_storage.add_class(&Account::CLASS); // accounts_storage.add_class(&Invitation::CLASS); // accounts_storage.check_prefixes(); core_storage.add_class(&TopicStorage::CLASS); core_storage.add_class(&RepoHashStorage::CLASS); core_storage.add_class(&OverlayStorage::CLASS); core_storage.add_class(&CommitStorage::CLASS); core_storage.check_prefixes(); } Ok(RocksDbServerStorage { wallet_storage, accounts_storage, //peers_storage, peers_last_seq_path, peers_last_seq: Mutex::new(HashMap::new()), block_storage, core_storage, }) } pub(crate) fn get_block_storage( &self, ) -> Arc> { Arc::clone(&self.block_storage) } pub(crate) fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> { // for now we don't use the hashmap. // TODO: let's see if the lock is even needed let _peers_last_seq = self.peers_last_seq.lock(); let mut filename = self.peers_last_seq_path.clone(); filename.push(format!("{}", peer)); let file = read(filename.clone()); let mut file_save = match file { Ok(ser) => { let last: u64 = serde_bare::from_slice(&ser).map_err(|_| ServerError::FileError)?; if last >= seq { return Err(ServerError::SequenceMismatch); } OpenOptions::new() .write(true) .open(filename) .map_err(|_| ServerError::FileError)? } Err(_) => File::create(filename).map_err(|_| ServerError::FileError)?, }; let ser = serde_bare::to_vec(&seq).unwrap(); file_save .write_all(&ser) .map_err(|_| ServerError::FileError)?; file_save.sync_data().map_err(|_| ServerError::FileError)?; Ok(()) } pub(crate) fn get_user(&self, user_id: PubKey) -> Result { log_debug!("get_user {user_id}"); Ok(Account::open(&user_id, &self.accounts_storage)?.is_admin()?) } pub(crate) fn has_no_user(&self) -> Result { Ok(!Account::has_users(&self.accounts_storage)?) } /// returns the credentials, storage_master_key, and peer_priv_key pub(crate) fn get_user_credentials( &self, user_id: &PubKey, ) -> Result { log_debug!("get_user_credentials {user_id}"); let acc = Account::open(user_id, &self.accounts_storage)?; Ok(acc.get_credentials()?) } pub(crate) fn add_user(&self, user_id: PubKey, is_admin: bool) -> Result<(), ProtocolError> { log_debug!("add_user {user_id} is admin {is_admin}"); Account::create(&user_id, is_admin, &self.accounts_storage)?; Ok(()) } pub(crate) fn add_user_credentials( &self, user_id: &PubKey, credentials: &Credentials, ) -> Result<(), ProtocolError> { log_debug!("add_user_credentials {user_id}"); let acc = Account::create(&user_id, false, &self.accounts_storage)?; acc.add_credentials(credentials)?; //let storage_key = SymKey::random(); //let peer_priv_key = PrivKey::random_ed(); //acc.add_user_keys(&storage_key, &peer_priv_key)?; Ok(()) } pub(crate) fn del_user(&self, user_id: PubKey) -> Result<(), ProtocolError> { log_debug!("del_user {user_id}"); let acc = Account::open(&user_id, &self.accounts_storage)?; acc.del()?; // TODO: stop the verifier, if any Ok(()) } pub(crate) fn list_users(&self, admins: bool) -> Result, ProtocolError> { log_debug!("list_users that are admin == {admins}"); Ok(Account::get_all_users(admins, &self.accounts_storage)?) } pub(crate) fn list_invitations( &self, admin: bool, unique: bool, multi: bool, ) -> Result)>, ProtocolError> { log_debug!("list_invitations admin={admin} unique={unique} multi={multi}"); Ok(Invitation::get_all_invitations( &self.accounts_storage, admin, unique, multi, )?) } pub(crate) fn add_invitation( &self, invite_code: &InvitationCode, expiry: u32, memo: &Option, ) -> Result<(), ProtocolError> { log_debug!("add_invitation {invite_code} expiry {expiry}"); Invitation::create(invite_code, expiry, memo, &self.accounts_storage)?; Ok(()) } pub(crate) fn get_invitation_type(&self, invite_code: [u8; 32]) -> Result { log_debug!("get_invitation_type {:?}", invite_code); let inv = Invitation::open(&invite_code, &self.accounts_storage)?; inv.get_type() } pub(crate) fn remove_invitation(&self, invite_code: [u8; 32]) -> Result<(), ProtocolError> { log_debug!("remove_invitation {:?}", invite_code); let inv = Invitation::open(&invite_code, &self.accounts_storage)?; inv.del()?; Ok(()) } pub(crate) fn get_repo_pin_status( &self, overlay: &OverlayId, repo: &RepoHash, user: &UserId, ) -> Result { let repo_info = RepoHashStorage::load_for_user(user, repo, overlay, &self.core_storage)?; let mut topics = vec![]; for topic in repo_info.topics { if let Ok(mut model) = TopicStorage::open(&topic, overlay, &self.core_storage) { match TopicStorage::USERS.get(&mut model, user) { Err(_) => {} Ok(publisher) => topics.push(TopicSubRes::new_from_heads( TopicStorage::get_all_heads(&mut model)?, publisher, topic, TopicStorage::COMMITS_NBR.get(&mut model)?, )), } } } if topics.is_empty() { return Err(ServerError::False); } Ok(RepoPinStatus::V0(RepoPinStatusV0 { hash: repo.clone(), expose_outer: repo_info.expose_outer.len() > 0, topics, })) } pub(crate) fn pin_repo_write( &self, overlay_access: &OverlayAccess, repo: &RepoHash, user_id: &UserId, ro_topics: &Vec, rw_topics: &Vec, overlay_root_topic: &Option, expose_outer: bool, ) -> Result { assert!(!overlay_access.is_read_only()); // TODO: all the below DB operations should be done inside a single transaction. need refactor of Object-KCV-Mapping to take an optional transaction. let inner_overlay = overlay_access.overlay_id_for_client_protocol_purpose(); let mut inner_overlay_storage = match OverlayStorage::open(inner_overlay, &self.core_storage) { Err(StorageError::NotFound) => { // inner overlay doesn't exist, we need to create it OverlayStorage::create( inner_overlay, &(*overlay_access).into(), expose_outer, &self.core_storage, )? } Err(e) => return Err(e.into()), Ok(os) => os, }; // the overlay we use to store all the info is: the outer for a RW access, and the inner for a WO access. let overlay = match inner_overlay_storage.overlay_type() { OverlayType::Outer(_) | OverlayType::OuterOnly => { panic!("shouldnt happen: we are pinning to an inner overlay. why is it outer type?") } OverlayType::Inner(outer) => outer, OverlayType::InnerOnly => inner_overlay, } .clone(); // if an overlay_root_topic was provided, we update it in the DB: // this information is stored on the inner overlay record, contrary to the rest of the info below, that is stored on the outer (except for WO) if overlay_root_topic.is_some() { OverlayStorage::TOPIC.set( &mut inner_overlay_storage, overlay_root_topic.as_ref().unwrap(), )?; } // we now do the pinning : let mut result: RepoOpened = vec![]; let mut repo_info = RepoHashStorage::open(repo, &overlay, &self.core_storage)?; if expose_outer { RepoHashStorage::EXPOSE_OUTER.add(&mut repo_info, user_id)?; } let mut rw_topics_added: HashMap = HashMap::with_capacity(rw_topics.len()); for topic in rw_topics { let topic_id = topic.topic_id(); let mut topic_storage = TopicStorage::create(topic_id, &overlay, repo, &self.core_storage, true)?; RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic_id)?; let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, topic)?; TopicStorage::USERS.add_or_change(&mut topic_storage, user_id, &true)?; rw_topics_added.insert( *topic_id, TopicSubRes::new_from_heads( TopicStorage::get_all_heads(&mut topic_storage)?, true, *topic_id, TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, ), ); } for topic in ro_topics { if rw_topics_added.contains_key(topic) { continue; //we do not want to add again as read_only, a topic that was just opened as RW (publisher) } let mut topic_storage = TopicStorage::create(topic, &overlay, repo, &self.core_storage, true)?; RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?; let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &false)?; result.push(TopicSubRes::new_from_heads( TopicStorage::get_all_heads(&mut topic_storage)?, false, *topic, TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, )); } result.extend(rw_topics_added.into_values()); Ok(result) } pub(crate) fn pin_repo_read( &self, overlay: &OverlayId, repo: &RepoHash, user_id: &UserId, ro_topics: &Vec, ) -> Result { let mut overlay_storage = OverlayStorage::open(overlay, &self.core_storage)?; match overlay_storage.overlay_type() { OverlayType::Outer(_) => { let mut result: RepoOpened = vec![]; let repo_info = RepoHashStorage::load_topics(repo, overlay, &self.core_storage)?; for topic in ro_topics { if repo_info.topics.contains(topic) { let mut topic_storage = TopicStorage::open(topic, overlay, &self.core_storage)?; let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &false)?; result.push(TopicSubRes::new_from_heads( TopicStorage::get_all_heads(&mut topic_storage)?, false, *topic, TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, )); } } Ok(result) } _ => return Err(ServerError::NotFound), } } fn check_overlay(&self, overlay: &OverlayId) -> Result { let mut overlay_storage = OverlayStorage::open(overlay, &self.core_storage).map_err(|e| match e { StorageError::NotFound => ServerError::OverlayNotFound, _ => e.into(), })?; Ok(match overlay_storage.overlay_type() { OverlayType::OuterOnly => { if overlay.is_outer() { *overlay } else { return Err(ServerError::OverlayMismatch); } } OverlayType::Outer(_) => { if overlay.is_outer() { *overlay } else { return Err(ServerError::OverlayMismatch); } } OverlayType::Inner(outer) => { if outer.is_outer() { *outer } else { return Err(ServerError::OverlayMismatch); } } OverlayType::InnerOnly => { if overlay.is_inner() { *overlay } else { return Err(ServerError::OverlayMismatch); } } }) } pub(crate) fn topic_sub( &self, overlay: &OverlayId, repo: &RepoHash, topic: &TopicId, user_id: &UserId, publisher: Option<&PublisherAdvert>, ) -> Result { let overlay = self.check_overlay(overlay)?; // now we check that the repo was previously pinned. // if it was opened but not pinned, then this should be dealt with in the ServerBroker, in memory, not here) let is_publisher = publisher.is_some(); // (we already checked that the advert is valid) let mut topic_storage = TopicStorage::create(topic, &overlay, repo, &self.core_storage, true)?; let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &is_publisher)?; if is_publisher { let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, publisher.unwrap())?; } let mut repo_info = RepoHashStorage::open(repo, &overlay, &self.core_storage)?; RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?; Ok(TopicSubRes::new_from_heads( TopicStorage::get_all_heads(&mut topic_storage)?, is_publisher, *topic, TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, )) } pub(crate) fn get_commit( &self, overlay: &OverlayId, id: &ObjectId, ) -> Result, ServerError> { let overlay = self.check_overlay(overlay)?; let mut commit_storage = CommitStorage::open(id, &overlay, &self.core_storage)?; let event_info = commit_storage .event() .as_ref() .left() .ok_or(ServerError::NotFound)?; // TODO: for now we do not deal with events that have been removed from storage let mut blocks = Vec::with_capacity(event_info.blocks.len()); for block_id in event_info.blocks.iter() { let block = self.block_storage.read().unwrap().get(&overlay, block_id)?; blocks.push(block); } Ok(blocks) } pub(crate) fn has_block( &self, overlay: &OverlayId, block_id: &BlockId, ) -> Result<(), ServerError> { let overlay = self.check_overlay(overlay)?; let overlay = &overlay; Ok(self.block_storage.read().unwrap().has(overlay, block_id)?) } pub(crate) fn get_block( &self, overlay: &OverlayId, block_id: &BlockId, ) -> Result { let overlay = self.check_overlay(overlay)?; let overlay = &overlay; Ok(self.block_storage.read().unwrap().get(overlay, block_id)?) } pub(crate) fn add_block( &self, overlay: &OverlayId, block: Block, ) -> Result { if overlay.is_outer() { // we don't publish events on the outer overlay! return Err(ServerError::OverlayMismatch); } let overlay = self.check_overlay(overlay)?; let overlay = &overlay; let mut overlay_storage = OverlayStorage::new(overlay, &self.core_storage); Ok(self.add_block_(overlay, &mut overlay_storage, block)?) } fn add_block_( &self, overlay_id: &OverlayId, overlay_storage: &mut OverlayStorage, block: Block, ) -> Result { let block_id = self .block_storage .write() .unwrap() .put(overlay_id, &block, true)?; OverlayStorage::BLOCKS.increment(overlay_storage, &block_id)?; Ok(block_id) } pub(crate) fn save_event( &self, overlay: &OverlayId, event: Event, user_id: &UserId, ) -> Result { if overlay.is_outer() { // we don't publish events on the outer overlay! return Err(ServerError::OverlayMismatch); } let overlay = self.check_overlay(overlay)?; let overlay = &overlay; // TODO: check that the sequence number is correct let topic = *event.topic_id(); // check that the topic exists and that this user has pinned it as publisher let mut topic_storage = TopicStorage::open(&topic, overlay, &self.core_storage).map_err(|e| match e { StorageError::NotFound => ServerError::TopicNotFound, _ => e.into(), })?; let is_publisher = TopicStorage::USERS .get(&mut topic_storage, user_id) .map_err(|e| match e { StorageError::NotFound => ServerError::AccessDenied, _ => e.into(), })?; if !is_publisher { return Err(ServerError::AccessDenied); } //log_info!("SAVED EVENT in overlay {:?} : {}", overlay, event); // remove the blocks from inside the event, and save the "dehydrated" event and each block separately. match event { Event::V0(mut v0) => { let mut overlay_storage = OverlayStorage::new(overlay, &self.core_storage); let mut extracted_blocks_ids = Vec::with_capacity(v0.content.blocks.len()); let first_block_copy = v0.content.blocks[0].clone(); let temp_mini_block_storage = HashMapBlockStorage::new(); for block in v0.content.blocks { let _ = temp_mini_block_storage.put(overlay, &block, false)?; extracted_blocks_ids.push(self.add_block_( overlay, &mut overlay_storage, block, )?); } // creating a temporary store to access the blocks let temp_store = Store::new_from_overlay_id( overlay, Arc::new(std::sync::RwLock::new(temp_mini_block_storage)), ); let commit_id = extracted_blocks_ids[0]; let header = Object::load_header(&first_block_copy, &temp_store).map_err(|_e| { //log_err!("err : {:?}", e); ServerError::InvalidHeader })?; v0.content.blocks = vec![]; let event_info = EventInfo { event: Event::V0(v0), blocks: extracted_blocks_ids, }; CommitStorage::create( &commit_id, overlay, event_info, &header, true, &self.core_storage, )?; let past = if header.is_some() { HashSet::from_iter(header.unwrap().acks_and_nacks()) } else { HashSet::new() }; let head = HashSet::from([commit_id]); //TODO: current_heads in TopicInfo in ServerBroker is not updated (but it isn't used so far) TopicStorage::HEADS.remove_from_set_and_add(&mut topic_storage, past, head)?; TopicStorage::COMMITS_NBR.increment(&mut topic_storage)?; } } Ok(topic) } pub(crate) fn topic_sync_req( &self, overlay: &OverlayId, topic: &TopicId, known_heads: &Vec, target_heads: &Vec, known_commits: &Option, ) -> Result, ServerError> { let overlay = self.check_overlay(overlay)?; // quick solution for now using the Branch::sync_req. TODO: use the saved references (ACKS,DEPS) in the server_storage, to have much quicker responses let target_heads = if target_heads.is_empty() { // get the current_heads let mut topic_storage = TopicStorage::new(topic, &overlay, &self.core_storage); let heads = TopicStorage::get_all_heads(&mut topic_storage)?; if heads.is_empty() { return Err(ServerError::TopicNotFound); } Box::new(heads.into_iter()) as Box> } else { Box::new(target_heads.iter().cloned()) as Box> }; let store = Store::new_from_overlay_id(&overlay, Arc::clone(&self.block_storage)); let commits = Branch::sync_req(target_heads, known_heads, known_commits, &store) .map_err(|_| ServerError::MalformedBranch)?; let mut result = Vec::with_capacity(commits.len()); for commit_id in commits { let commit_storage = CommitStorage::open(&commit_id, &overlay, &self.core_storage)?; let mut event_info = commit_storage .take_event() .left() .ok_or(ServerError::NotFound)?; // TODO: for now we do not deal with events that have been removed from storage // rehydrate the event : let mut blocks = Vec::with_capacity(event_info.blocks.len()); for block_id in event_info.blocks { let block = store.get(&block_id)?; blocks.push(block); } match event_info.event { Event::V0(ref mut v0) => { v0.content.blocks = blocks; } } result.push(TopicSyncRes::V0(TopicSyncResV0::Event(event_info.event))); } Ok(result) } }