From d84f0e051d120a98d0d157aa44fc8becebd5e6ec Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Fri, 26 Apr 2024 00:00:38 +0300 Subject: [PATCH] bootstrap the personal site on webapp --- Cargo.lock | 1 + nextgraph/src/local_broker.rs | 7 +- ng-broker/src/broker_storage/account.rs | 4 +- ng-broker/src/broker_storage/wallet.rs | 14 +- ng-broker/src/server_storage.rs | 50 +++++-- ng-broker/src/server_ws.rs | 6 +- ng-net/src/actor.rs | 6 +- ng-net/src/actors/client/commit_get.rs | 125 +++++++++++++++++ ng-net/src/actors/client/mod.rs | 2 + ng-net/src/broker.rs | 5 +- ng-net/src/connection.rs | 2 +- ng-net/src/server_storage.rs | 2 + ng-net/src/types.rs | 59 +++++++- ng-repo/src/block.rs | 8 ++ ng-repo/src/block_storage.rs | 5 + ng-repo/src/branch.rs | 2 +- ng-repo/src/commit.rs | 58 +++++--- ng-repo/src/errors.rs | 33 ++++- ng-repo/src/event.rs | 2 +- ng-repo/src/object.rs | 30 +++- ng-storage-rocksdb/src/kcv_storage.rs | 30 ++-- ng-verifier/Cargo.toml | 1 + ng-verifier/src/rocksdb_user_storage.rs | 6 +- ng-verifier/src/types.rs | 8 ++ ng-verifier/src/verifier.rs | 176 ++++++++++++++---------- ngone/src/main.rs | 6 +- 26 files changed, 496 insertions(+), 152 deletions(-) create mode 100644 ng-net/src/actors/client/commit_get.rs diff --git a/Cargo.lock b/Cargo.lock index e718830..3fa4f3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3421,6 +3421,7 @@ dependencies = [ "blake3", "chacha20", "either", + "futures", "getrandom 0.2.10", "ng-net", "ng-repo", diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index dc361dd..9b89129 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -113,7 +113,7 @@ impl JsStorageConfig { let start_key = format!("ng_outboxes@{}@start", peer_id); let res = (session_read4)(start_key.clone()); let _start = match res { - Err(_) => return Err(NgError::NotFound), + Err(_) => return Err(NgError::JsStorageKeyNotFound), Ok(start_str) => start_str .parse::() .map_err(|_| NgError::InvalidFileFormat)?, @@ -1310,6 +1310,11 @@ pub async fn user_connect_with_device_info( log_info!("SENDING EVENTS FROM OUTBOX: {:?}", res); // TODO: load verifier from remote connection (if not RocksDb type) + if let Err(e) = session.verifier.bootstrap().await { + session.verifier.connected_server_id = None; + Broker::close_all_connections().await; + tried.as_mut().unwrap().3 = Some(e.to_string()); + } break; } else { diff --git a/ng-broker/src/broker_storage/account.rs b/ng-broker/src/broker_storage/account.rs index 4a0e13f..96cdef7 100644 --- a/ng-broker/src/broker_storage/account.rs +++ b/ng-broker/src/broker_storage/account.rs @@ -248,7 +248,7 @@ mod test { use ng_repo::errors::StorageError; use ng_repo::types::*; use ng_repo::utils::*; - use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; + use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; use std::fs; use tempfile::Builder; @@ -261,7 +261,7 @@ mod test { let key: [u8; 32] = [0; 32]; fs::create_dir_all(root.path()).unwrap(); println!("{}", root.path().to_str().unwrap()); - let mut store = RocksdbKCVStorage::open(root.path(), key).unwrap(); + let mut store = RocksDbKCVStorage::open(root.path(), key).unwrap(); let user_id = PubKey::Ed25519PubKey([1; 32]); diff --git a/ng-broker/src/broker_storage/wallet.rs b/ng-broker/src/broker_storage/wallet.rs index 49afa5d..229e386 100644 --- a/ng-broker/src/broker_storage/wallet.rs +++ b/ng-broker/src/broker_storage/wallet.rs @@ -23,12 +23,14 @@ pub struct Wallet<'a> { } impl<'a> Wallet<'a> { - const PREFIX: u8 = b"w"[0]; - const PREFIX_OVERLAY: u8 = b"o"[0]; - const PREFIX_USER: u8 = b"u"[0]; + const PREFIX: u8 = b'w'; + const PREFIX_OVERLAY: u8 = b'o'; + const PREFIX_USER: u8 = b'u'; const KEY_ACCOUNTS: [u8; 8] = *b"accounts"; const KEY_PEERS: [u8; 5] = *b"peers"; + const KEY_CORE: [u8; 4] = *b"core"; + const KEY_BLOCKS: [u8; 6] = *b"blocks"; // propertie's suffixes const SYM_KEY: u8 = b"s"[0]; @@ -110,6 +112,12 @@ impl<'a> Wallet<'a> { pub fn get_or_create_peers_key(&self) -> Result { self.get_or_create_single_key(Self::PREFIX, &Self::KEY_PEERS.to_vec()) } + pub fn get_or_create_blocks_key(&self) -> Result { + self.get_or_create_single_key(Self::PREFIX, &Self::KEY_BLOCKS.to_vec()) + } + pub fn get_or_create_core_key(&self) -> Result { + self.get_or_create_single_key(Self::PREFIX, &Self::KEY_CORE.to_vec()) + } pub fn get_or_create_accounts_key(&self) -> Result { self.get_or_create_single_key(Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec()) } diff --git a/ng-broker/src/server_storage.rs b/ng-broker/src/server_storage.rs index b831a5a..2035267 100644 --- a/ng-broker/src/server_storage.rs +++ b/ng-broker/src/server_storage.rs @@ -25,17 +25,20 @@ use ng_repo::errors::{ProtocolError, ServerError, StorageError}; use ng_repo::kcv_storage::KCVStorage; use ng_repo::log::*; use ng_repo::types::*; -use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; +use ng_storage_rocksdb::block_storage::RocksDbBlockStorage; +use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; -pub struct RocksdbServerStorage { - wallet_storage: RocksdbKCVStorage, - accounts_storage: RocksdbKCVStorage, - peers_storage: RocksdbKCVStorage, +pub struct RocksDbServerStorage { + wallet_storage: RocksDbKCVStorage, + accounts_storage: RocksDbKCVStorage, + peers_storage: RocksDbKCVStorage, peers_last_seq_path: PathBuf, peers_last_seq: Mutex>, + block_storage: RocksDbBlockStorage, + core_storage: RocksDbKCVStorage, } -impl RocksdbServerStorage { +impl RocksDbServerStorage { pub fn open( path: &mut PathBuf, master_key: SymKey, @@ -47,7 +50,7 @@ impl RocksdbServerStorage { std::fs::create_dir_all(wallet_path.clone()).unwrap(); log_debug!("opening wallet DB"); //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way - let wallet_storage = RocksdbKCVStorage::open(&wallet_path, master_key.slice().clone())?; + let wallet_storage = RocksDbKCVStorage::open(&wallet_path, master_key.slice().clone())?; let wallet = Wallet::open(&wallet_storage); // create/open the ACCOUNTS storage @@ -59,7 +62,7 @@ impl RocksdbServerStorage { accounts_key = wallet.create_accounts_key()?; std::fs::create_dir_all(accounts_path.clone()).unwrap(); let accounts_storage = - RocksdbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; + RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; let symkey = SymKey::random(); let invite_code = InvitationCode::Admin(symkey.clone()); let _ = Invitation::create( @@ -87,7 +90,7 @@ impl RocksdbServerStorage { std::fs::create_dir_all(accounts_path.clone()).unwrap(); //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way let accounts_storage = - RocksdbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; + RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; // create/open the PEERS storage log_debug!("opening peers DB"); @@ -96,24 +99,42 @@ impl RocksdbServerStorage { peers_path.push("peers"); std::fs::create_dir_all(peers_path.clone()).unwrap(); //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way - let peers_storage = RocksdbKCVStorage::open(&peers_path, peers_key.slice().clone())?; + let peers_storage = RocksDbKCVStorage::open(&peers_path, peers_key.slice().clone())?; // creates the path for peers_last_seq let mut peers_last_seq_path = path.clone(); peers_last_seq_path.push("peers_last_seq"); std::fs::create_dir_all(peers_last_seq_path.clone()).unwrap(); - Ok(RocksdbServerStorage { + // opening block_storage + let mut blocks_path = path.clone(); + blocks_path.push("blocks"); + std::fs::create_dir_all(blocks_path.clone()).unwrap(); + let blocks_key = wallet.get_or_create_blocks_key()?; + let block_storage = RocksDbBlockStorage::open(&blocks_path, *blocks_key.slice())?; + + // create/open the PEERS storage + log_debug!("opening core DB"); + let core_key = wallet.get_or_create_core_key()?; + let mut core_path = path.clone(); + core_path.push("core"); + std::fs::create_dir_all(core_path.clone()).unwrap(); + //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way + let core_storage = RocksDbKCVStorage::open(&core_path, core_key.slice().clone())?; + + Ok(RocksDbServerStorage { wallet_storage, accounts_storage, peers_storage, peers_last_seq_path, peers_last_seq: Mutex::new(HashMap::new()), + block_storage, + core_storage, }) } } -impl ServerStorage for RocksdbServerStorage { +impl ServerStorage for RocksDbServerStorage { fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> { // for now we don't use the hashmap. // TODO: let's see if the lock is even needed @@ -248,4 +269,9 @@ impl ServerStorage for RocksdbServerStorage { publisher: publisher.is_some(), })) } + + fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError> { + //TODO: implement correctly ! + Ok(vec![Block::dummy()]) + } } diff --git a/ng-broker/src/server_ws.rs b/ng-broker/src/server_ws.rs index 0f7c8d9..50256d0 100644 --- a/ng-broker/src/server_ws.rs +++ b/ng-broker/src/server_ws.rs @@ -12,7 +12,7 @@ //! WebSocket implementation of the Broker use crate::interfaces::*; -use crate::server_storage::RocksdbServerStorage; +use crate::server_storage::RocksDbServerStorage; use crate::types::*; use async_std::io::ReadExt; use async_std::net::{TcpListener, TcpStream}; @@ -586,7 +586,7 @@ pub async fn run_server_accept_one( // let master_key: [u8; 32] = [0; 32]; // std::fs::create_dir_all(root.path()).unwrap(); // log_debug!("data directory: {}", root.path().to_str().unwrap()); - // let store = RocksdbKCVStorage::open(root.path(), master_key); + // let store = RocksDbKCVStorage::open(root.path(), master_key); let socket = TcpListener::bind(addrs.as_str()).await?; log_debug!("Listening on {}", addrs.as_str()); @@ -777,7 +777,7 @@ pub async fn run_server_v0( std::fs::create_dir_all(path.clone()).unwrap(); // opening the server storage (that contains the encryption keys for each store/overlay ) - let broker_storage = RocksdbServerStorage::open( + let broker_storage = RocksDbServerStorage::open( &mut path, wallet_master_key, if admin_invite { diff --git a/ng-net/src/actor.rs b/ng-net/src/actor.rs index cab6194..4f78b17 100644 --- a/ng-net/src/actor.rs +++ b/ng-net/src/actor.rs @@ -14,6 +14,7 @@ use async_std::stream::StreamExt; use async_std::sync::Mutex; use futures::{channel::mpsc, SinkExt}; +use ng_repo::log::*; use std::any::TypeId; use std::sync::Arc; @@ -128,7 +129,10 @@ impl< && TypeId::of::() != TypeId::of::<()>() { let (mut b_sender, b_receiver) = mpsc::unbounded::(); - let response = msg.try_into().map_err(|_e| ProtocolError::ActorError)?; + let response = msg.try_into().map_err(|e| { + log_err!("msg.try_into {}", e); + ProtocolError::ActorError + })?; b_sender .send(response) .await diff --git a/ng-net/src/actors/client/commit_get.rs b/ng-net/src/actors/client/commit_get.rs new file mode 100644 index 0000000..b1aa5d9 --- /dev/null +++ b/ng-net/src/actors/client/commit_get.rs @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ +use crate::broker::{ServerConfig, BROKER}; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, types::ProtocolMessage}; +use async_std::sync::Mutex; +use ng_repo::errors::*; +use ng_repo::log::*; +use ng_repo::types::{Block, OverlayId, PubKey}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +impl CommitGet { + pub fn get_actor(&self, id: i64) -> Box { + Actor::::new_responder(id) + } + + pub fn overlay(&self) -> &OverlayId { + match self { + Self::V0(v0) => v0.overlay.as_ref().unwrap(), + } + } + pub fn set_overlay(&mut self, overlay: OverlayId) { + match self { + Self::V0(v0) => v0.overlay = Some(overlay), + } + } +} + +impl TryFrom for CommitGet { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let req: ClientRequestContentV0 = msg.try_into()?; + if let ClientRequestContentV0::CommitGet(a) = req { + Ok(a) + } else { + log_debug!("INVALID {:?}", req); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(msg: CommitGet) -> ProtocolMessage { + let overlay = *msg.overlay(); + ProtocolMessage::from_client_request_v0(ClientRequestContentV0::CommitGet(msg), overlay) + } +} + +impl TryFrom for Block { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let res: ClientResponseContentV0 = msg.try_into()?; + if let ClientResponseContentV0::Block(a) = res { + Ok(a) + } else { + log_debug!("INVALID {:?}", res); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(b: Block) -> ProtocolMessage { + let mut cr: ClientResponse = ClientResponseContentV0::Block(b).into(); + cr.set_result(ServerError::PartialContent.into()); + cr.into() + } +} + +impl Actor<'_, CommitGet, Block> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, CommitGet, Block> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = CommitGet::try_from(msg)?; + log_info!("GOT CommitGet {:?}", req); + let broker = BROKER.read().await; + let blocks_res = broker + .get_server_storage()? + .get_commit(req.overlay(), req.id()); + + match blocks_res { + Ok(blocks) => { + if blocks.len() == 0 { + let re: Result<(), ServerError> = Err(ServerError::EmptyStream); + fsm.lock() + .await + .send_in_reply_to(re.into(), self.id()) + .await?; + return Ok(()); + } + let mut lock = fsm.lock().await; + + for block in blocks { + lock.send_in_reply_to(block.into(), self.id()).await?; + } + let re: Result<(), ServerError> = Err(ServerError::EndOfStream); + lock.send_in_reply_to(re.into(), self.id()).await?; + } + Err(e) => { + let re: Result<(), ServerError> = Err(e); + fsm.lock() + .await + .send_in_reply_to(re.into(), self.id()) + .await?; + } + } + + Ok(()) + } +} diff --git a/ng-net/src/actors/client/mod.rs b/ng-net/src/actors/client/mod.rs index 1c3723c..a2a6672 100644 --- a/ng-net/src/actors/client/mod.rs +++ b/ng-net/src/actors/client/mod.rs @@ -5,3 +5,5 @@ pub mod pin_repo; pub mod topic_sub; pub mod event; + +pub mod commit_get; diff --git a/ng-net/src/broker.rs b/ng-net/src/broker.rs index 4f3b72b..d586f4e 100644 --- a/ng-net/src/broker.rs +++ b/ng-net/src/broker.rs @@ -23,7 +23,6 @@ use async_std::sync::{Arc, RwLock}; use either::Either; use futures::channel::mpsc; use futures::SinkExt; -use ng_repo::block_storage::HashMapBlockStorage; use ng_repo::errors::*; use ng_repo::log::*; use ng_repo::object::Object; @@ -452,10 +451,10 @@ impl<'a> Broker<'a> { // #[cfg(not(target_arch = "wasm32"))] // pub fn test_storage(&self, path: PathBuf) { - // use ng_storage_rocksdb::kcv_store::RocksdbKCVStorage; + // use ng_storage_rocksdb::kcv_store::RocksDbKCVStorage; // let key: [u8; 32] = [0; 32]; - // let test_storage = RocksdbKCVStorage::open(&path, key); + // let test_storage = RocksDbKCVStorage::open(&path, key); // match test_storage { // Err(e) => { // log_debug!("storage error {}", e); diff --git a/ng-net/src/connection.rs b/ng-net/src/connection.rs index 7608f61..0d9c882 100644 --- a/ng-net/src/connection.rs +++ b/ng-net/src/connection.rs @@ -300,7 +300,7 @@ impl NoiseFSM { if in_reply_to != 0 { msg.set_id(in_reply_to); } - log_debug!("SENDING: {:?}", msg); + log_info!("SENDING: {:?}", msg); if self.noise_cipher_state_enc.is_some() { let cipher = self.encrypt(msg)?; self.sender diff --git a/ng-net/src/server_storage.rs b/ng-net/src/server_storage.rs index a3f2730..ef14fb7 100644 --- a/ng-net/src/server_storage.rs +++ b/ng-net/src/server_storage.rs @@ -58,4 +58,6 @@ pub trait ServerStorage: Send + Sync { topic: &TopicId, publisher: Option<&PublisherAdvert>, ) -> Result; + + fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result, ServerError>; } diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index f9cd7a0..205a3bd 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -2924,6 +2924,39 @@ impl BlockGet { } } +/// Request a Commit by ID +/// +/// commit_header_key is always set to None in the reply when request is made on OuterOverlay of protected or Group overlays +/// The difference with BlockGet is that the Broker will try to return all the commit blocks as they were sent in the Pub/Sub Event, if it has it. +/// This will help in having all the blocks (including the header and body blocks), while a BlockGet would inevitably return only the blocks of the ObjectContent, +/// and not the header nor the body. And the load() would fail with CommitLoadError::MissingBlocks. That's what happens when the Commit is not present in the pubsub, +/// and we need to default to using BlockGet instead. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CommitGetV0 { + /// Block IDs to request + pub id: ObjectId, + + /// Topic the commit is referenced from, if it is known by the requester. + /// can be used to do a BlockSearchTopic in the core overlay. + pub topic: Option, + + #[serde(skip)] + pub overlay: Option, +} + +/// Request a Commit by ID (see [CommitGetV0] for more details) +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum CommitGet { + V0(CommitGetV0), +} +impl CommitGet { + pub fn id(&self) -> &ObjectId { + match self { + CommitGet::V0(o) => &o.id, + } + } +} + /// Request to store one or more blocks #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BlocksPutV0 { @@ -3052,6 +3085,7 @@ pub enum ClientRequestContentV0 { BlocksExist(BlocksExist), BlockGet(BlockGet), + CommitGet(CommitGet), TopicSyncReq(TopicSyncReq), // For Pinned Repos only : @@ -3071,6 +3105,7 @@ impl ClientRequestContentV0 { ClientRequestContentV0::TopicSub(a) => a.set_overlay(overlay), ClientRequestContentV0::PinRepo(a) => {} ClientRequestContentV0::PublishEvent(a) => a.set_overlay(overlay), + ClientRequestContentV0::CommitGet(a) => a.set_overlay(overlay), _ => unimplemented!(), } } @@ -3117,6 +3152,7 @@ impl ClientRequest { ClientRequestContentV0::PinRepo(r) => r.get_actor(self.id()), ClientRequestContentV0::TopicSub(r) => r.get_actor(self.id()), ClientRequestContentV0::PublishEvent(r) => r.get_actor(self.id()), + ClientRequestContentV0::CommitGet(r) => r.get_actor(self.id()), _ => unimplemented!(), }, } @@ -3251,6 +3287,14 @@ pub struct ClientResponseV0 { pub content: ClientResponseContentV0, } +impl ClientResponse { + pub fn set_result(&mut self, res: u16) { + match self { + Self::V0(v0) => v0.result = res, + } + } +} + /// Response to a `ClientRequest` #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ClientResponse { @@ -3280,7 +3324,7 @@ where } impl From<()> for ProtocolMessage { - fn from(msg: ()) -> ProtocolMessage { + fn from(_msg: ()) -> ProtocolMessage { let cm: ClientResponse = ServerError::Ok.into(); cm.into() } @@ -3327,7 +3371,8 @@ impl TryFrom for ClientResponseContentV0 { .. })) = msg { - if res == 0 { + let err = ServerError::try_from(res).unwrap(); + if !err.is_err() { Ok(content) } else { Err(ProtocolError::ServerError) @@ -3771,6 +3816,16 @@ impl ProtocolMessage { } } +impl From for ClientResponse { + fn from(msg: ClientResponseContentV0) -> ClientResponse { + ClientResponse::V0(ClientResponseV0 { + id: 0, + result: 0, + content: msg, + }) + } +} + impl From for ProtocolMessage { fn from(msg: ClientResponseContentV0) -> ProtocolMessage { let client_res = ClientResponse::V0(ClientResponseV0 { diff --git a/ng-repo/src/block.rs b/ng-repo/src/block.rs index 8f271c8..5ba0be7 100644 --- a/ng-repo/src/block.rs +++ b/ng-repo/src/block.rs @@ -43,6 +43,10 @@ impl BlockV0 { b } + pub fn dummy() -> BlockV0 { + BlockV0::new(vec![], None, vec![], None) + } + pub fn new_random_access( children: Vec, content: Vec, @@ -115,6 +119,10 @@ impl Block { Block::V0(BlockV0::new(children, header_ref, content, key)) } + pub fn dummy() -> Block { + Block::V0(BlockV0::dummy()) + } + pub fn new_random_access( children: Vec, content: Vec, diff --git a/ng-repo/src/block_storage.rs b/ng-repo/src/block_storage.rs index bf46f6d..75a5d61 100644 --- a/ng-repo/src/block_storage.rs +++ b/ng-repo/src/block_storage.rs @@ -25,6 +25,11 @@ pub trait BlockStorage: Send + Sync { /// Load a block from the storage. fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result; + // fetch a block from broker or core overlay + // pub async fn fetch(&self, id: &BlockId) -> Result { + // todo!(); + // } + /// Save a block to the storage. fn put(&self, overlay: &OverlayId, block: &Block) -> Result; diff --git a/ng-repo/src/branch.rs b/ng-repo/src/branch.rs index 933ef1e..961acf7 100644 --- a/ng-repo/src/branch.rs +++ b/ng-repo/src/branch.rs @@ -125,7 +125,7 @@ impl Branch { //log_debug!(" target_heads: {:?}", target_heads); //log_debug!(" known_heads: {:?}", known_heads); - /// Load causal past of a Commit `cobj` in a `Branch` from the `BlockStorage`, + /// Load causal past of a Commit `cobj` in a `Branch` from the `Store`, /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG. /// optionally collecting the missing objects/blocks that couldn't be found locally on the way fn load_causal_past( diff --git a/ng-repo/src/commit.rs b/ng-repo/src/commit.rs index 5352e4a..12171ad 100644 --- a/ng-repo/src/commit.rs +++ b/ng-repo/src/commit.rs @@ -30,13 +30,12 @@ use std::iter::FromIterator; pub enum CommitLoadError { MissingBlocks(Vec), ObjectParseError, - NotACommitError, - NotACommitBodyError, + NotACommit, + NotACommitBody, CannotBeAtRootOfBranch, MustBeAtRootOfBranch, SingletonCannotHaveHeader, - BodyLoadError(Vec), - HeaderLoadError, + MalformedHeader, BodyTypeMismatch, } @@ -45,7 +44,6 @@ pub enum CommitVerifyError { InvalidSignature, InvalidHeader, PermissionDenied, - DepLoadError(CommitLoadError), } impl CommitV0 { @@ -308,13 +306,36 @@ impl Commit { ) -> Result { let (id, key) = (commit_ref.id, commit_ref.key); match Object::load(id, Some(key.clone()), store) { + Err(ObjectParseError::MissingHeaderBlocks((obj, mut missing))) => { + if with_body { + let content = obj + .content() + .map_err(|_e| CommitLoadError::ObjectParseError)?; + let mut commit = match content { + ObjectContent::V0(ObjectContentV0::Commit(c)) => c, + _ => return Err(CommitLoadError::NotACommit), + }; + commit.set_id(id); + commit.set_key(key.clone()); + match commit.load_body(store) { + Ok(_) => return Err(CommitLoadError::MissingBlocks(missing)), + Err(CommitLoadError::MissingBlocks(mut missing_body)) => { + missing.append(&mut missing_body); + return Err(CommitLoadError::MissingBlocks(missing)); + } + Err(e) => return Err(e), + } + } else { + return Err(CommitLoadError::MissingBlocks(missing)); + } + } Ok(obj) => { let content = obj .content() .map_err(|_e| CommitLoadError::ObjectParseError)?; let mut commit = match content { ObjectContent::V0(ObjectContentV0::Commit(c)) => c, - _ => return Err(CommitLoadError::NotACommitError), + _ => return Err(CommitLoadError::NotACommit), }; commit.set_id(id); commit.set_key(key.clone()); @@ -341,7 +362,7 @@ impl Commit { let content = self.content_v0(); let (id, key) = (content.body.id, content.body.key.clone()); let obj = Object::load(id.clone(), Some(key.clone()), store).map_err(|e| match e { - ObjectParseError::MissingBlocks(missing) => CommitLoadError::BodyLoadError(missing), + ObjectParseError::MissingBlocks(missing) => CommitLoadError::MissingBlocks(missing), _ => CommitLoadError::ObjectParseError, })?; let content = obj @@ -352,7 +373,7 @@ impl Commit { self.set_body(body); Ok(self.body().unwrap()) } - _ => Err(CommitLoadError::NotACommitBodyError), + _ => Err(CommitLoadError::NotACommitBody), } } @@ -448,7 +469,7 @@ impl Commit { // load deps (the previous RootBranch commit) let deps = self.deps(); if deps.len() != 1 { - Err(CommitLoadError::HeaderLoadError) + Err(CommitLoadError::MalformedHeader) } else { let previous_rootbranch_commit = Commit::load(deps[0].clone(), store, true)?; let previous_rootbranch = previous_rootbranch_commit @@ -472,7 +493,7 @@ impl Commit { return Ok(false); } } - Err(CommitLoadError::HeaderLoadError) + Err(CommitLoadError::MalformedHeader) } CommitBody::V0(CommitBodyV0::Delete(_)) => Ok(true), _ => Ok(false), @@ -623,7 +644,7 @@ impl Commit { ) -> Result, CommitLoadError> { //log_debug!(">> verify_full_object_refs_of_branch_at_commit: #{}", self.seq()); - /// Load `Commit`s of a `Branch` from the `BlockStorage` starting from the given `Commit`, + /// Load `Commit`s of a `Branch` from the `Store` starting from the given `Commit`, /// and collect missing `ObjectId`s fn load_direct_object_refs( commit: &Commit, @@ -658,7 +679,7 @@ impl Commit { Err(CommitLoadError::MissingBlocks(m)) => { // The commit body is missing. missing.extend(m.clone()); - Err(CommitLoadError::BodyLoadError(m)) + Err(CommitLoadError::MissingBlocks(m)) } Err(e) => Err(e), }?; @@ -1551,14 +1572,11 @@ mod test { let obj = Object::new(content.clone(), None, max_object_size, &store); - let hashmap_storage = HashMapBlockStorage::new(); - let storage = Box::new(hashmap_storage); - _ = obj.save(&store).expect("save object"); let commit = Commit::load(obj.reference().unwrap(), &store, false); - assert_eq!(commit, Err(CommitLoadError::NotACommitError)); + assert_eq!(commit, Err(CommitLoadError::NotACommit)); } #[test] @@ -1649,7 +1667,7 @@ mod test { // match commit.load_body(repo.store.unwrap()) { // Ok(_b) => panic!("Body should not exist"), - // Err(CommitLoadError::BodyLoadError(missing)) => { + // Err(CommitLoadError::MissingBlocks(missing)) => { // assert_eq!(missing.len(), 1); // } // Err(e) => panic!("Commit load error: {:?}", e), @@ -1658,7 +1676,7 @@ mod test { commit.verify_sig(&repo).expect("verify signature"); match commit.verify_perm(&repo) { Ok(_) => panic!("Commit should not be Ok"), - Err(NgError::CommitLoadError(CommitLoadError::BodyLoadError(missing))) => { + Err(NgError::CommitLoadError(CommitLoadError::MissingBlocks(missing))) => { assert_eq!(missing.len(), 1); } Err(e) => panic!("Commit verify perm error: {:?}", e), @@ -1666,7 +1684,7 @@ mod test { // match commit.verify_full_object_refs_of_branch_at_commit(repo.store.unwrap()) { // Ok(_) => panic!("Commit should not be Ok"), - // Err(CommitLoadError::BodyLoadError(missing)) => { + // Err(CommitLoadError::MissingBlocks(missing)) => { // assert_eq!(missing.len(), 1); // } // Err(e) => panic!("Commit verify error: {:?}", e), @@ -1674,7 +1692,7 @@ mod test { match commit.verify(&repo) { Ok(_) => panic!("Commit should not be Ok"), - Err(NgError::CommitLoadError(CommitLoadError::BodyLoadError(missing))) => { + Err(NgError::CommitLoadError(CommitLoadError::MissingBlocks(missing))) => { assert_eq!(missing.len(), 1); } Err(e) => panic!("Commit verify error: {:?}", e), diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 3cd5bc3..535e61c 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -10,6 +10,7 @@ //! Errors pub use crate::commit::{CommitLoadError, CommitVerifyError}; +use crate::object::Object; use num_enum::IntoPrimitive; use num_enum::TryFromPrimitive; @@ -35,8 +36,10 @@ pub enum NgError { PermissionDenied, InvalidPazzle, CommitLoadError(CommitLoadError), + ObjectParseError(ObjectParseError), StorageError(StorageError), NotFound, + JsStorageKeyNotFound, IoError, CommitVerifyError(CommitVerifyError), LocalBrokerNotInitialized, @@ -62,6 +65,7 @@ pub enum NgError { InvalidResponse, NotAServerError, VerifierError(VerifierError), + SiteNotFoundOnBroker, } impl Error for NgError {} @@ -70,6 +74,7 @@ impl fmt::Display for NgError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::WalletError(string) => write!(f, "WalletError: {}", string), + Self::JsStorageWriteError(string) => write!(f, "JsStorageWriteError: {}", string), _ => write!(f, "{:?}", self), } } @@ -149,7 +154,18 @@ impl From for NgError { impl From for NgError { fn from(e: VerifierError) -> Self { - NgError::VerifierError(e) + match e { + VerifierError::InvalidKey => NgError::InvalidKey, + VerifierError::SerializationError => NgError::SerializationError, + VerifierError::CommitLoadError(e) => NgError::CommitLoadError(e), + VerifierError::StorageError(e) => NgError::StorageError(e), + VerifierError::ObjectParseError(e) => NgError::ObjectParseError(e), + VerifierError::TopicNotFound => NgError::TopicNotFound, + VerifierError::RepoNotFound => NgError::RepoNotFound, + VerifierError::StoreNotFound => NgError::StoreNotFound, + VerifierError::BranchNotFound => NgError::BranchNotFound, + _ => NgError::VerifierError(e), + } } } @@ -172,6 +188,8 @@ pub enum ObjectParseError { BlockDeserializeError, /// Error deserializing content of the object ObjectDeserializeError, + + MissingHeaderBlocks((Object, Vec)), } #[derive(Debug, PartialEq, Eq, Clone)] @@ -211,6 +229,8 @@ pub enum ServerError { SequenceMismatch, FileError, RepoAlreadyOpened, + NotFound, + EmptyStream, } impl ServerError { @@ -218,7 +238,7 @@ impl ServerError { *self == ServerError::PartialContent || *self == ServerError::EndOfStream } pub fn is_err(&self) -> bool { - *self != ServerError::Ok + *self != ServerError::Ok && !self.is_stream() } } @@ -228,6 +248,7 @@ pub enum VerifierError { MissingCommitInDag, CommitBodyNotFound, InvalidKey, + SerializationError, OtherError(String), CommitLoadError(CommitLoadError), InvalidRepositoryCommit, @@ -240,6 +261,8 @@ pub enum VerifierError { MalformedSyncSignatureDeps, TopicNotFound, RepoNotFound, + StoreNotFound, + BranchNotFound, InvalidBranch, NoBlockStorageAvailable, RootBranchNotFound, @@ -250,6 +273,12 @@ impl From for VerifierError { match e { NgError::InvalidKey => VerifierError::InvalidKey, NgError::RepoNotFound => VerifierError::RepoNotFound, + NgError::BranchNotFound => VerifierError::BranchNotFound, + NgError::SerializationError => VerifierError::SerializationError, + // NgError::JsStorageReadError + // NgError::JsStorageWriteError(String) + // NgError::JsStorageKeyNotFound + // NgError::InvalidFileFormat _ => VerifierError::OtherError(e.to_string()), } } diff --git a/ng-repo/src/event.rs b/ng-repo/src/event.rs index 698ee3e..3587cb8 100644 --- a/ng-repo/src/event.rs +++ b/ng-repo/src/event.rs @@ -213,7 +213,7 @@ impl EventV0 { // first_id = Some(id) // } // } - // first_id.ok_or(NgError::CommitLoadError(CommitLoadError::NotACommitError)) + // first_id.ok_or(NgError::CommitLoadError(CommitLoadError::NotACommit)) // } /// opens an event with the key derived from information kept in Repo. diff --git a/ng-repo/src/object.rs b/ng-repo/src/object.rs index 9c7e4f1..4e23da7 100644 --- a/ng-repo/src/object.rs +++ b/ng-repo/src/object.rs @@ -40,7 +40,7 @@ pub const DATA_VARINT_EXTRA: usize = 4; pub const BLOCK_MAX_DATA_EXTRA: usize = 4; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Clone)] /// An Object in memory. This is not used to serialize data pub struct Object { /// keeps the deduplicated blocks of the Object @@ -491,13 +491,29 @@ impl Object { panic!("shouldn't happen") } CommitHeaderObject::Id(id) => { - let obj = Object::load(id, Some(header_ref.key.clone()), store)?; - match obj.content()? { - ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => { - commit_header.set_id(id); - (Some(commit_header), Some(obj.blocks().cloned().collect())) + let obj_res = Object::load(id, Some(header_ref.key.clone()), store); + match obj_res { + Err(ObjectParseError::MissingBlocks(m)) => { + return Err(ObjectParseError::MissingHeaderBlocks(( + Object { + blocks, + block_contents, + header: None, + header_blocks: vec![], + #[cfg(test)] + already_saved: false, + }, + m, + ))); } - _ => return Err(ObjectParseError::InvalidHeader), + Err(e) => return Err(e), + Ok(obj) => match obj.content()? { + ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => { + commit_header.set_id(id); + (Some(commit_header), Some(obj.blocks().cloned().collect())) + } + _ => return Err(ObjectParseError::InvalidHeader), + }, } } CommitHeaderObject::EncryptedContent(content) => { diff --git a/ng-storage-rocksdb/src/kcv_storage.rs b/ng-storage-rocksdb/src/kcv_storage.rs index 7b64fe6..db4bae3 100644 --- a/ng-storage-rocksdb/src/kcv_storage.rs +++ b/ng-storage-rocksdb/src/kcv_storage.rs @@ -23,7 +23,7 @@ use rocksdb::{ }; pub struct RocksdbTransaction<'a> { - store: &'a RocksdbKCVStorage, + store: &'a RocksDbKCVStorage, tx: Option>, } @@ -64,7 +64,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> { family: &Option, ) -> Result, Vec)>, StorageError> { let property_start = - RocksdbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix); + RocksDbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix); let iter = self.get_iterator(&property_start, &family)?; self.store .get_all_keys_and_values_(prefix, key_size, key_prefix, suffix, iter) @@ -79,7 +79,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> { ) -> Result>, StorageError> { let key_size = key.len(); let prop_values = self.get_all_keys_and_values(prefix, key_size, key, None, family)?; - Ok(RocksdbKCVStorage::get_all_properties_of_key( + Ok(RocksDbKCVStorage::get_all_properties_of_key( prop_values, key_size, &properties, @@ -94,7 +94,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> { suffix: Option, family: &Option, ) -> Result, StorageError> { - let property = RocksdbKCVStorage::compute_property(prefix, key, &suffix); + let property = RocksDbKCVStorage::compute_property(prefix, key, &suffix); let res = match family { Some(cf) => self.tx().get_for_update_cf( self.store @@ -152,7 +152,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> { value: &Vec, family: &Option, ) -> Result<(), StorageError> { - let property = RocksdbKCVStorage::compute_property(prefix, key, &suffix); + let property = RocksDbKCVStorage::compute_property(prefix, key, &suffix); match family { Some(cf) => self.tx().put_cf( self.store @@ -189,7 +189,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> { suffix: Option, family: &Option, ) -> Result<(), StorageError> { - let property = RocksdbKCVStorage::compute_property(prefix, key, &suffix); + let property = RocksDbKCVStorage::compute_property(prefix, key, &suffix); let res = match family { Some(cf) => self.tx().delete_cf( self.store @@ -253,7 +253,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> { family: &Option, ) -> Result<(), StorageError> { let key_size = key.len() + property_size; - let property_start = RocksdbKCVStorage::calc_key_start(prefix, key_size, &key, &suffix); + let property_start = RocksDbKCVStorage::calc_key_start(prefix, key_size, &key, &suffix); let mut iter = self.get_iterator(&property_start, &family)?; let mut vec_key_end = key.clone(); @@ -261,7 +261,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> { vec_key_end.append(&mut trailing_max); // let property_start = Self::compute_property(prefix, &vec_key_start, suffix); - let property_end = RocksdbKCVStorage::compute_property( + let property_end = RocksDbKCVStorage::compute_property( prefix, &vec_key_end, &Some(suffix.unwrap_or(255u8)), @@ -299,7 +299,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> { } } -pub struct RocksdbKCVStorage { +pub struct RocksDbKCVStorage { /// the main store where all the properties of keys are stored db: TransactionDB, /// path for the storage backend data @@ -320,7 +320,7 @@ fn compare(a: &[T], b: &[T]) -> std::cmp::Ordering { return a.len().cmp(&b.len()); } -impl ReadTransaction for RocksdbKCVStorage { +impl ReadTransaction for RocksDbKCVStorage { /// returns a list of (key,value) that are in the range specified in the request fn get_all_keys_and_values( &self, @@ -407,7 +407,7 @@ impl ReadTransaction for RocksdbKCVStorage { } } -impl KCVStorage for RocksdbKCVStorage { +impl KCVStorage for RocksDbKCVStorage { fn write_transaction( &self, method: &mut dyn FnMut(&mut dyn WriteTransaction) -> Result<(), StorageError>, @@ -427,7 +427,7 @@ impl KCVStorage for RocksdbKCVStorage { } } -impl WriteTransaction for RocksdbKCVStorage { +impl WriteTransaction for RocksDbKCVStorage { /// Save a property value to the store. fn put( &self, @@ -508,7 +508,7 @@ impl WriteTransaction for RocksdbKCVStorage { } } -impl RocksdbKCVStorage { +impl RocksDbKCVStorage { pub fn path(&self) -> PathBuf { PathBuf::from(&self.path) } @@ -652,7 +652,7 @@ impl RocksdbKCVStorage { /// Opens the store and returns a KCVStorage object that should be kept and used to manipulate the properties /// The key is the encryption key for the data at rest. - pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result { + pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result { let mut opts = Options::default(); //opts.set_use_fsync(true); opts.create_if_missing(true); @@ -668,7 +668,7 @@ impl RocksdbKCVStorage { Env::version() ); - Ok(RocksdbKCVStorage { + Ok(RocksDbKCVStorage { db: db, path: path.to_str().unwrap().to_string(), }) diff --git a/ng-verifier/Cargo.toml b/ng-verifier/Cargo.toml index a5259ab..4da82ce 100644 --- a/ng-verifier/Cargo.toml +++ b/ng-verifier/Cargo.toml @@ -34,6 +34,7 @@ threshold_crypto = "0.4.0" rand = { version = "0.7", features = ["getrandom"] } web-time = "0.2.0" either = "1.8.1" +futures = "0.3.24" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" } diff --git a/ng-verifier/src/rocksdb_user_storage.rs b/ng-verifier/src/rocksdb_user_storage.rs index 6d6139e..11fe899 100644 --- a/ng-verifier/src/rocksdb_user_storage.rs +++ b/ng-verifier/src/rocksdb_user_storage.rs @@ -17,7 +17,7 @@ use ng_repo::block_storage::BlockStorage; use ng_repo::repo::{BranchInfo, Repo}; use ng_repo::store::Store; use ng_repo::{errors::StorageError, types::*}; -use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; +use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::{ @@ -27,13 +27,13 @@ use std::{ }; pub(crate) struct RocksDbUserStorage { - user_storage: RocksdbKCVStorage, + user_storage: RocksDbKCVStorage, } impl RocksDbUserStorage { pub fn open(path: &PathBuf, master_key: [u8; 32]) -> Result { Ok(RocksDbUserStorage { - user_storage: RocksdbKCVStorage::open(path, master_key)?, + user_storage: RocksDbKCVStorage::open(path, master_key)?, }) } } diff --git a/ng-verifier/src/types.rs b/ng-verifier/src/types.rs index b3ff1b4..69fd576 100644 --- a/ng-verifier/src/types.rs +++ b/ng-verifier/src/types.rs @@ -105,6 +105,7 @@ impl fmt::Debug for JsSaveSessionConfig { pub enum VerifierConfigType { /// nothing will be saved on disk during the session Memory, + /// only the session information is saved locally. the UserStorage is not saved. JsSaveSession(JsSaveSessionConfig), /// will save all user data locally, with RocksDb backend RocksDb(PathBuf), @@ -130,6 +131,13 @@ impl VerifierConfigType { _ => false, } } + + pub(crate) fn is_in_memory(&self) -> bool { + match self { + Self::Memory | Self::JsSaveSession(_) => true, + _ => false, + } + } } #[derive(Debug)] diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index c724d11..8254f07 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -10,6 +10,7 @@ //! Repo object (on heap) to handle a Repository use crate::commits::*; use crate::types::*; +use async_std::stream::StreamExt; use ng_net::actor::SoS; use ng_net::broker::{Broker, BROKER}; use ng_repo::log::*; @@ -35,7 +36,7 @@ use core::fmt; #[cfg(not(target_family = "wasm"))] use crate::rocksdb_user_storage::RocksDbUserStorage; use crate::user_storage::{InMemoryUserStorage, UserStorage}; -use async_std::sync::{Mutex, RwLockWriteGuard}; +use async_std::sync::{Mutex, RwLockReadGuard, RwLockWriteGuard}; use std::{collections::HashMap, path::PathBuf, sync::Arc}; use ng_net::{ @@ -169,8 +170,25 @@ impl Verifier { self.config.config_type.is_persistent() } + fn is_in_memory(&self) -> bool { + self.config.config_type.is_in_memory() + } + + fn get_arc_block_storage( + &self, + ) -> Result>, VerifierError> { + Ok(Arc::clone( + self.block_storage + .as_ref() + .ok_or(VerifierError::NoBlockStorageAvailable)?, + )) + } + pub fn get_store_or_load(&mut self, store_repo: &StoreRepo) -> Arc { let overlay_id = store_repo.overlay_id_for_storage_purpose(); + let block_storage = self + .get_arc_block_storage() + .expect("get_store_or_load cannot be called on Remote Verifier"); let store = self.stores.entry(overlay_id).or_insert_with(|| { // FIXME: get store_readcap and store_overlay_branch_readcap from user storage let store_readcap = ReadCap::nil(); @@ -179,13 +197,7 @@ impl Verifier { *store_repo, store_readcap, store_overlay_branch_readcap, - Arc::clone( - &self - .block_storage - .as_ref() - .ok_or(core::fmt::Error) - .expect("get_store_or_load cannot be called on Remote Verifier"), - ), + block_storage, ); Arc::new(store) }); @@ -260,9 +272,12 @@ impl Verifier { Ok(()) } - pub fn get_store(&self, store_repo: &StoreRepo) -> Result, NgError> { + pub fn get_store(&self, store_repo: &StoreRepo) -> Result, VerifierError> { let overlay_id = store_repo.overlay_id_for_storage_purpose(); - let store = self.stores.get(&overlay_id).ok_or(NgError::StoreNotFound)?; + let store = self + .stores + .get(&overlay_id) + .ok_or(VerifierError::StoreNotFound)?; Ok(Arc::clone(store)) } @@ -303,12 +318,15 @@ impl Verifier { repo_id: &RepoId, branch_id: &BranchId, current_heads: Vec, - ) -> Result<(), NgError> { - let repo = self.repos.get_mut(repo_id).ok_or(NgError::RepoNotFound)?; + ) -> Result<(), VerifierError> { + let repo = self + .repos + .get_mut(repo_id) + .ok_or(VerifierError::RepoNotFound)?; let branch = repo .branches .get_mut(branch_id) - .ok_or(NgError::BranchNotFound)?; + .ok_or(VerifierError::BranchNotFound)?; branch.current_heads = current_heads; Ok(()) } @@ -498,7 +516,7 @@ impl Verifier { if self.connected_server_id.is_some() { // send the event to the server already - let broker = BROKER.write().await; + let broker = BROKER.read().await; let user = self.config.user_priv_key.to_pub(); let remote = self.connected_server_id.to_owned().unwrap(); self.send_event(event, &broker, &user, &remote, overlay) @@ -546,7 +564,7 @@ impl Verifier { async fn send_event<'a>( &mut self, event: Event, - broker: &RwLockWriteGuard<'a, Broker<'a>>, + broker: &RwLockReadGuard<'a, Broker<'a>>, user: &UserId, remote: &DirectPeerId, overlay: OverlayId, @@ -665,20 +683,26 @@ impl Verifier { } } + fn user_storage_if_persistent(&self) -> Option>> { + if self.is_persistent() { + if let Some(us) = self.user_storage.as_ref() { + Some(Arc::clone(us)) + } else { + None + } + } else { + None + } + } + pub(crate) fn add_branch_and_save( &mut self, repo_id: &RepoId, branch_info: BranchInfo, store_repo: &StoreRepo, ) -> Result<(), VerifierError> { - let user_storage = self - .user_storage - .as_ref() - .map(|us| Arc::clone(us)) - .and_then(|u| if self.is_persistent() { Some(u) } else { None }); - - if user_storage.is_some() { - user_storage.unwrap().add_branch(repo_id, &branch_info)?; + if let Some(user_storage) = self.user_storage_if_persistent() { + user_storage.add_branch(repo_id, &branch_info)?; } let branch_id = branch_info.id.clone(); let topic_id = branch_info.topic.clone(); @@ -702,56 +726,82 @@ impl Verifier { branch_id: &BranchId, store_repo: &StoreRepo, ) -> Result<(), VerifierError> { - let user_storage = self - .user_storage - .as_ref() - .map(|us| Arc::clone(us)) - .and_then(|u| if self.is_persistent() { Some(u) } else { None }); - if user_storage.is_some() { + if let Some(user_storage) = self.user_storage_if_persistent() { let repo = self.get_repo(repo_id, store_repo)?; - user_storage - .unwrap() - .add_branch(repo_id, repo.branch(branch_id)?)?; + user_storage.add_branch(repo_id, repo.branch(branch_id)?)?; } Ok(()) } pub(crate) fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), VerifierError> { - let user_storage = self - .user_storage - .as_ref() - .map(|us| Arc::clone(us)) - .and_then(|u| if self.is_persistent() { Some(u) } else { None }); - if user_storage.is_some() { - user_storage.unwrap().update_signer_cap(signer_cap)?; + if let Some(user_storage) = self.user_storage_if_persistent() { + user_storage.update_signer_cap(signer_cap)?; } Ok(()) } pub(crate) fn add_repo_and_save(&mut self, repo: Repo) -> &Repo { - let user_storage = self - .user_storage - .as_ref() - .map(|us| Arc::clone(us)) - .and_then(|u| if self.is_persistent() { Some(u) } else { None }); + let us = self.user_storage_if_persistent(); let repo_ref: &Repo = self.add_repo_(repo); // save in user_storage - if user_storage.is_some() { - let _ = user_storage.unwrap().save_repo(repo_ref); + if let Some(user_storage) = us { + let _ = user_storage.save_repo(repo_ref); } repo_ref } - pub(crate) fn get_repo(&self, id: &RepoId, store_repo: &StoreRepo) -> Result<&Repo, NgError> { + pub(crate) fn get_repo( + &self, + id: &RepoId, + store_repo: &StoreRepo, + ) -> Result<&Repo, VerifierError> { //let store = self.get_store(store_repo); - let repo_ref = self.repos.get(id).ok_or(NgError::RepoNotFound); + let repo_ref = self.repos.get(id).ok_or(VerifierError::RepoNotFound); repo_ref } + pub async fn bootstrap(&mut self) -> Result<(), NgError> { + if self.is_in_memory() { + // TODO only bootstrap if 3P stores of personal site not already loaded (by replay) + + let broker = BROKER.read().await; + let user = self.config.user_priv_key.to_pub(); + let remote = self.connected_server_id.to_owned().unwrap(); + let read_cap = self.config.private_store_read_cap.as_ref().unwrap(); + // first we fetch the read_cap commit of private store repo. + let msg = CommitGet::V0(CommitGetV0 { + id: read_cap.id, + topic: None, // we dont have the topic (only available from RepoLink/BranchLink) but we are pretty sure the Broker has the commit anyway. + overlay: Some(OverlayId::outer( + self.config.private_store_id.as_ref().unwrap(), + )), + }); + match broker + .request::(&user, &remote, msg) + .await + { + Err(NgError::ServerError(ServerError::NotFound)) => { + // TODO: fallback to BlockGet, then Commit::load(with_body:true), which will return an Err(CommitLoadError::MissingBlocks), then do another BlockGet with those, and then again Commit::load... + return Err(NgError::SiteNotFoundOnBroker); + } + Ok(SoS::Stream(mut blockstream)) => { + while let Some(block) = blockstream.next().await { + log_info!("GOT BLOCK {:?}", block); + } + Ok(()) + } + Ok(_) => return Err(NgError::InvalidResponse), + Err(e) => return Err(e), + } + } else { + Ok(()) + } + } + fn load_from_credentials_and_events( &mut self, events: &Vec, - ) -> Result<(), NgError> { + ) -> Result<(), VerifierError> { let private_store_id = self.config.private_store_id.as_ref().unwrap(); let private_outer_overlay_id = OverlayId::outer(private_store_id); let private_inner_overlay_id = OverlayId::inner( @@ -767,13 +817,7 @@ impl Verifier { store_repo, self.config.private_store_read_cap.to_owned().unwrap(), self.config.private_store_read_cap.to_owned().unwrap(), - Arc::clone( - &self - .block_storage - .as_ref() - .ok_or(core::fmt::Error) - .expect("couldn't get the block_storage"), - ), + self.get_arc_block_storage()?, )); let store = self @@ -946,7 +990,7 @@ impl Verifier { pub async fn send_outbox(&mut self) -> Result<(), NgError> { let events: Vec = self.take_events_from_outbox()?; - let broker = BROKER.write().await; + let broker = BROKER.read().await; let user = self.config.user_priv_key.to_pub(); let remote = self .connected_server_id @@ -1191,14 +1235,7 @@ impl Verifier { &mut self, update: &StoreUpdate, ) -> Result<(), VerifierError> { - let store = Store::new_from( - update, - Arc::clone( - self.block_storage - .as_ref() - .ok_or(VerifierError::NoBlockStorageAvailable)?, - ), - ); + let store = Store::new_from(update, self.get_arc_block_storage()?); let overlay_id = store.get_store_repo().overlay_id_for_storage_purpose(); let store = self .stores @@ -1220,6 +1257,7 @@ impl Verifier { true => SymKey::nil(), }; let overlay_id = store_repo.overlay_id_for_storage_purpose(); + let block_storage = self.get_arc_block_storage()?; let store = self.stores.entry(overlay_id).or_insert_with(|| { let store_readcap = ReadCap::nil(); // temporarily set the store_overlay_branch_readcap to an objectRef that has an empty id, and a key = to the repo_write_cap_secret @@ -1229,13 +1267,7 @@ impl Verifier { *store_repo, store_readcap, store_overlay_branch_readcap, - Arc::clone( - &self - .block_storage - .as_ref() - .ok_or(core::fmt::Error) - .expect("get_store_or_load cannot be called on Remote Verifier"), - ), + block_storage, ); Arc::new(store) }); diff --git a/ngone/src/main.rs b/ngone/src/main.rs index be8e967..d00a7c7 100644 --- a/ngone/src/main.rs +++ b/ngone/src/main.rs @@ -28,7 +28,7 @@ use ng_net::types::{APP_NG_ONE_URL, NG_ONE_URL}; use ng_repo::log::*; use ng_repo::types::*; use ng_repo::utils::{generate_keypair, sign, verify}; -use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; +use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage; use ng_wallet::types::*; #[derive(RustEmbed)] @@ -36,7 +36,7 @@ use ng_wallet::types::*; struct Static; struct Server { - store: RocksdbKCVStorage, + store: RocksDbKCVStorage, } impl Server { @@ -158,7 +158,7 @@ async fn main() { let key: [u8; 32] = [0; 32]; log_debug!("data directory: {}", dir.to_str().unwrap()); fs::create_dir_all(dir.clone()).unwrap(); - let store = RocksdbKCVStorage::open(&dir, key); + let store = RocksDbKCVStorage::open(&dir, key); if store.is_err() { return; }