bootstrap the personal site on webapp

pull/19/head
Niko PLP 7 months ago
parent e132a3623c
commit 7141c65d09
  1. 1
      Cargo.lock
  2. 7
      nextgraph/src/local_broker.rs
  3. 4
      ng-broker/src/broker_storage/account.rs
  4. 14
      ng-broker/src/broker_storage/wallet.rs
  5. 50
      ng-broker/src/server_storage.rs
  6. 6
      ng-broker/src/server_ws.rs
  7. 6
      ng-net/src/actor.rs
  8. 125
      ng-net/src/actors/client/commit_get.rs
  9. 2
      ng-net/src/actors/client/mod.rs
  10. 5
      ng-net/src/broker.rs
  11. 2
      ng-net/src/connection.rs
  12. 2
      ng-net/src/server_storage.rs
  13. 59
      ng-net/src/types.rs
  14. 8
      ng-repo/src/block.rs
  15. 5
      ng-repo/src/block_storage.rs
  16. 2
      ng-repo/src/branch.rs
  17. 58
      ng-repo/src/commit.rs
  18. 33
      ng-repo/src/errors.rs
  19. 2
      ng-repo/src/event.rs
  20. 22
      ng-repo/src/object.rs
  21. 30
      ng-storage-rocksdb/src/kcv_storage.rs
  22. 1
      ng-verifier/Cargo.toml
  23. 6
      ng-verifier/src/rocksdb_user_storage.rs
  24. 8
      ng-verifier/src/types.rs
  25. 176
      ng-verifier/src/verifier.rs
  26. 6
      ngone/src/main.rs

1
Cargo.lock generated

@ -3421,6 +3421,7 @@ dependencies = [
"blake3", "blake3",
"chacha20", "chacha20",
"either", "either",
"futures",
"getrandom 0.2.10", "getrandom 0.2.10",
"ng-net", "ng-net",
"ng-repo", "ng-repo",

@ -113,7 +113,7 @@ impl JsStorageConfig {
let start_key = format!("ng_outboxes@{}@start", peer_id); let start_key = format!("ng_outboxes@{}@start", peer_id);
let res = (session_read4)(start_key.clone()); let res = (session_read4)(start_key.clone());
let _start = match res { let _start = match res {
Err(_) => return Err(NgError::NotFound), Err(_) => return Err(NgError::JsStorageKeyNotFound),
Ok(start_str) => start_str Ok(start_str) => start_str
.parse::<u64>() .parse::<u64>()
.map_err(|_| NgError::InvalidFileFormat)?, .map_err(|_| NgError::InvalidFileFormat)?,
@ -1310,6 +1310,11 @@ pub async fn user_connect_with_device_info(
log_info!("SENDING EVENTS FROM OUTBOX: {:?}", res); log_info!("SENDING EVENTS FROM OUTBOX: {:?}", res);
// TODO: load verifier from remote connection (if not RocksDb type) // TODO: load verifier from remote connection (if not RocksDb type)
if let Err(e) = session.verifier.bootstrap().await {
session.verifier.connected_server_id = None;
Broker::close_all_connections().await;
tried.as_mut().unwrap().3 = Some(e.to_string());
}
break; break;
} else { } else {

@ -248,7 +248,7 @@ mod test {
use ng_repo::errors::StorageError; use ng_repo::errors::StorageError;
use ng_repo::types::*; use ng_repo::types::*;
use ng_repo::utils::*; use ng_repo::utils::*;
use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage;
use std::fs; use std::fs;
use tempfile::Builder; use tempfile::Builder;
@ -261,7 +261,7 @@ mod test {
let key: [u8; 32] = [0; 32]; let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap(); fs::create_dir_all(root.path()).unwrap();
println!("{}", root.path().to_str().unwrap()); println!("{}", root.path().to_str().unwrap());
let mut store = RocksdbKCVStorage::open(root.path(), key).unwrap(); let mut store = RocksDbKCVStorage::open(root.path(), key).unwrap();
let user_id = PubKey::Ed25519PubKey([1; 32]); let user_id = PubKey::Ed25519PubKey([1; 32]);

@ -23,12 +23,14 @@ pub struct Wallet<'a> {
} }
impl<'a> Wallet<'a> { impl<'a> Wallet<'a> {
const PREFIX: u8 = b"w"[0]; const PREFIX: u8 = b'w';
const PREFIX_OVERLAY: u8 = b"o"[0]; const PREFIX_OVERLAY: u8 = b'o';
const PREFIX_USER: u8 = b"u"[0]; const PREFIX_USER: u8 = b'u';
const KEY_ACCOUNTS: [u8; 8] = *b"accounts"; const KEY_ACCOUNTS: [u8; 8] = *b"accounts";
const KEY_PEERS: [u8; 5] = *b"peers"; const KEY_PEERS: [u8; 5] = *b"peers";
const KEY_CORE: [u8; 4] = *b"core";
const KEY_BLOCKS: [u8; 6] = *b"blocks";
// propertie's suffixes // propertie's suffixes
const SYM_KEY: u8 = b"s"[0]; const SYM_KEY: u8 = b"s"[0];
@ -110,6 +112,12 @@ impl<'a> Wallet<'a> {
pub fn get_or_create_peers_key(&self) -> Result<SymKey, StorageError> { pub fn get_or_create_peers_key(&self) -> Result<SymKey, StorageError> {
self.get_or_create_single_key(Self::PREFIX, &Self::KEY_PEERS.to_vec()) self.get_or_create_single_key(Self::PREFIX, &Self::KEY_PEERS.to_vec())
} }
pub fn get_or_create_blocks_key(&self) -> Result<SymKey, StorageError> {
self.get_or_create_single_key(Self::PREFIX, &Self::KEY_BLOCKS.to_vec())
}
pub fn get_or_create_core_key(&self) -> Result<SymKey, StorageError> {
self.get_or_create_single_key(Self::PREFIX, &Self::KEY_CORE.to_vec())
}
pub fn get_or_create_accounts_key(&self) -> Result<SymKey, StorageError> { pub fn get_or_create_accounts_key(&self) -> Result<SymKey, StorageError> {
self.get_or_create_single_key(Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec()) self.get_or_create_single_key(Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec())
} }

@ -25,17 +25,20 @@ use ng_repo::errors::{ProtocolError, ServerError, StorageError};
use ng_repo::kcv_storage::KCVStorage; use ng_repo::kcv_storage::KCVStorage;
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::types::*; use ng_repo::types::*;
use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; use ng_storage_rocksdb::block_storage::RocksDbBlockStorage;
use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage;
pub struct RocksdbServerStorage { pub struct RocksDbServerStorage {
wallet_storage: RocksdbKCVStorage, wallet_storage: RocksDbKCVStorage,
accounts_storage: RocksdbKCVStorage, accounts_storage: RocksDbKCVStorage,
peers_storage: RocksdbKCVStorage, peers_storage: RocksDbKCVStorage,
peers_last_seq_path: PathBuf, peers_last_seq_path: PathBuf,
peers_last_seq: Mutex<HashMap<PeerId, u64>>, peers_last_seq: Mutex<HashMap<PeerId, u64>>,
block_storage: RocksDbBlockStorage,
core_storage: RocksDbKCVStorage,
} }
impl RocksdbServerStorage { impl RocksDbServerStorage {
pub fn open( pub fn open(
path: &mut PathBuf, path: &mut PathBuf,
master_key: SymKey, master_key: SymKey,
@ -47,7 +50,7 @@ impl RocksdbServerStorage {
std::fs::create_dir_all(wallet_path.clone()).unwrap(); std::fs::create_dir_all(wallet_path.clone()).unwrap();
log_debug!("opening wallet DB"); log_debug!("opening wallet DB");
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let wallet_storage = RocksdbKCVStorage::open(&wallet_path, master_key.slice().clone())?; let wallet_storage = RocksDbKCVStorage::open(&wallet_path, master_key.slice().clone())?;
let wallet = Wallet::open(&wallet_storage); let wallet = Wallet::open(&wallet_storage);
// create/open the ACCOUNTS storage // create/open the ACCOUNTS storage
@ -59,7 +62,7 @@ impl RocksdbServerStorage {
accounts_key = wallet.create_accounts_key()?; accounts_key = wallet.create_accounts_key()?;
std::fs::create_dir_all(accounts_path.clone()).unwrap(); std::fs::create_dir_all(accounts_path.clone()).unwrap();
let accounts_storage = let accounts_storage =
RocksdbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?;
let symkey = SymKey::random(); let symkey = SymKey::random();
let invite_code = InvitationCode::Admin(symkey.clone()); let invite_code = InvitationCode::Admin(symkey.clone());
let _ = Invitation::create( let _ = Invitation::create(
@ -87,7 +90,7 @@ impl RocksdbServerStorage {
std::fs::create_dir_all(accounts_path.clone()).unwrap(); std::fs::create_dir_all(accounts_path.clone()).unwrap();
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let accounts_storage = let accounts_storage =
RocksdbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?; RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?;
// create/open the PEERS storage // create/open the PEERS storage
log_debug!("opening peers DB"); log_debug!("opening peers DB");
@ -96,24 +99,42 @@ impl RocksdbServerStorage {
peers_path.push("peers"); peers_path.push("peers");
std::fs::create_dir_all(peers_path.clone()).unwrap(); std::fs::create_dir_all(peers_path.clone()).unwrap();
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way //TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let peers_storage = RocksdbKCVStorage::open(&peers_path, peers_key.slice().clone())?; let peers_storage = RocksDbKCVStorage::open(&peers_path, peers_key.slice().clone())?;
// creates the path for peers_last_seq // creates the path for peers_last_seq
let mut peers_last_seq_path = path.clone(); let mut peers_last_seq_path = path.clone();
peers_last_seq_path.push("peers_last_seq"); peers_last_seq_path.push("peers_last_seq");
std::fs::create_dir_all(peers_last_seq_path.clone()).unwrap(); std::fs::create_dir_all(peers_last_seq_path.clone()).unwrap();
Ok(RocksdbServerStorage { // opening block_storage
let mut blocks_path = path.clone();
blocks_path.push("blocks");
std::fs::create_dir_all(blocks_path.clone()).unwrap();
let blocks_key = wallet.get_or_create_blocks_key()?;
let block_storage = RocksDbBlockStorage::open(&blocks_path, *blocks_key.slice())?;
// create/open the PEERS storage
log_debug!("opening core DB");
let core_key = wallet.get_or_create_core_key()?;
let mut core_path = path.clone();
core_path.push("core");
std::fs::create_dir_all(core_path.clone()).unwrap();
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let core_storage = RocksDbKCVStorage::open(&core_path, core_key.slice().clone())?;
Ok(RocksDbServerStorage {
wallet_storage, wallet_storage,
accounts_storage, accounts_storage,
peers_storage, peers_storage,
peers_last_seq_path, peers_last_seq_path,
peers_last_seq: Mutex::new(HashMap::new()), peers_last_seq: Mutex::new(HashMap::new()),
block_storage,
core_storage,
}) })
} }
} }
impl ServerStorage for RocksdbServerStorage { impl ServerStorage for RocksDbServerStorage {
fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> { fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> {
// for now we don't use the hashmap. // for now we don't use the hashmap.
// TODO: let's see if the lock is even needed // TODO: let's see if the lock is even needed
@ -248,4 +269,9 @@ impl ServerStorage for RocksdbServerStorage {
publisher: publisher.is_some(), publisher: publisher.is_some(),
})) }))
} }
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> {
//TODO: implement correctly !
Ok(vec![Block::dummy()])
}
} }

@ -12,7 +12,7 @@
//! WebSocket implementation of the Broker //! WebSocket implementation of the Broker
use crate::interfaces::*; use crate::interfaces::*;
use crate::server_storage::RocksdbServerStorage; use crate::server_storage::RocksDbServerStorage;
use crate::types::*; use crate::types::*;
use async_std::io::ReadExt; use async_std::io::ReadExt;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
@ -586,7 +586,7 @@ pub async fn run_server_accept_one(
// let master_key: [u8; 32] = [0; 32]; // let master_key: [u8; 32] = [0; 32];
// std::fs::create_dir_all(root.path()).unwrap(); // std::fs::create_dir_all(root.path()).unwrap();
// log_debug!("data directory: {}", root.path().to_str().unwrap()); // log_debug!("data directory: {}", root.path().to_str().unwrap());
// let store = RocksdbKCVStorage::open(root.path(), master_key); // let store = RocksDbKCVStorage::open(root.path(), master_key);
let socket = TcpListener::bind(addrs.as_str()).await?; let socket = TcpListener::bind(addrs.as_str()).await?;
log_debug!("Listening on {}", addrs.as_str()); log_debug!("Listening on {}", addrs.as_str());
@ -777,7 +777,7 @@ pub async fn run_server_v0(
std::fs::create_dir_all(path.clone()).unwrap(); std::fs::create_dir_all(path.clone()).unwrap();
// opening the server storage (that contains the encryption keys for each store/overlay ) // opening the server storage (that contains the encryption keys for each store/overlay )
let broker_storage = RocksdbServerStorage::open( let broker_storage = RocksDbServerStorage::open(
&mut path, &mut path,
wallet_master_key, wallet_master_key,
if admin_invite { if admin_invite {

@ -14,6 +14,7 @@
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::Mutex; use async_std::sync::Mutex;
use futures::{channel::mpsc, SinkExt}; use futures::{channel::mpsc, SinkExt};
use ng_repo::log::*;
use std::any::TypeId; use std::any::TypeId;
use std::sync::Arc; use std::sync::Arc;
@ -128,7 +129,10 @@ impl<
&& TypeId::of::<B>() != TypeId::of::<()>() && TypeId::of::<B>() != TypeId::of::<()>()
{ {
let (mut b_sender, b_receiver) = mpsc::unbounded::<B>(); let (mut b_sender, b_receiver) = mpsc::unbounded::<B>();
let response = msg.try_into().map_err(|_e| ProtocolError::ActorError)?; let response = msg.try_into().map_err(|e| {
log_err!("msg.try_into {}", e);
ProtocolError::ActorError
})?;
b_sender b_sender
.send(response) .send(response)
.await .await

@ -0,0 +1,125 @@
/*
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::{Block, OverlayId, PubKey};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
impl CommitGet {
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
Actor::<CommitGet, Block>::new_responder(id)
}
pub fn overlay(&self) -> &OverlayId {
match self {
Self::V0(v0) => v0.overlay.as_ref().unwrap(),
}
}
pub fn set_overlay(&mut self, overlay: OverlayId) {
match self {
Self::V0(v0) => v0.overlay = Some(overlay),
}
}
}
impl TryFrom<ProtocolMessage> for CommitGet {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
let req: ClientRequestContentV0 = msg.try_into()?;
if let ClientRequestContentV0::CommitGet(a) = req {
Ok(a)
} else {
log_debug!("INVALID {:?}", req);
Err(ProtocolError::InvalidValue)
}
}
}
impl From<CommitGet> for ProtocolMessage {
fn from(msg: CommitGet) -> ProtocolMessage {
let overlay = *msg.overlay();
ProtocolMessage::from_client_request_v0(ClientRequestContentV0::CommitGet(msg), overlay)
}
}
impl TryFrom<ProtocolMessage> for Block {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
let res: ClientResponseContentV0 = msg.try_into()?;
if let ClientResponseContentV0::Block(a) = res {
Ok(a)
} else {
log_debug!("INVALID {:?}", res);
Err(ProtocolError::InvalidValue)
}
}
}
impl From<Block> for ProtocolMessage {
fn from(b: Block) -> ProtocolMessage {
let mut cr: ClientResponse = ClientResponseContentV0::Block(b).into();
cr.set_result(ServerError::PartialContent.into());
cr.into()
}
}
impl Actor<'_, CommitGet, Block> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, CommitGet, Block> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
let req = CommitGet::try_from(msg)?;
log_info!("GOT CommitGet {:?}", req);
let broker = BROKER.read().await;
let blocks_res = broker
.get_server_storage()?
.get_commit(req.overlay(), req.id());
match blocks_res {
Ok(blocks) => {
if blocks.len() == 0 {
let re: Result<(), ServerError> = Err(ServerError::EmptyStream);
fsm.lock()
.await
.send_in_reply_to(re.into(), self.id())
.await?;
return Ok(());
}
let mut lock = fsm.lock().await;
for block in blocks {
lock.send_in_reply_to(block.into(), self.id()).await?;
}
let re: Result<(), ServerError> = Err(ServerError::EndOfStream);
lock.send_in_reply_to(re.into(), self.id()).await?;
}
Err(e) => {
let re: Result<(), ServerError> = Err(e);
fsm.lock()
.await
.send_in_reply_to(re.into(), self.id())
.await?;
}
}
Ok(())
}
}

@ -5,3 +5,5 @@ pub mod pin_repo;
pub mod topic_sub; pub mod topic_sub;
pub mod event; pub mod event;
pub mod commit_get;

@ -23,7 +23,6 @@ use async_std::sync::{Arc, RwLock};
use either::Either; use either::Either;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use ng_repo::block_storage::HashMapBlockStorage;
use ng_repo::errors::*; use ng_repo::errors::*;
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::object::Object; use ng_repo::object::Object;
@ -452,10 +451,10 @@ impl<'a> Broker<'a> {
// #[cfg(not(target_arch = "wasm32"))] // #[cfg(not(target_arch = "wasm32"))]
// pub fn test_storage(&self, path: PathBuf) { // pub fn test_storage(&self, path: PathBuf) {
// use ng_storage_rocksdb::kcv_store::RocksdbKCVStorage; // use ng_storage_rocksdb::kcv_store::RocksDbKCVStorage;
// let key: [u8; 32] = [0; 32]; // let key: [u8; 32] = [0; 32];
// let test_storage = RocksdbKCVStorage::open(&path, key); // let test_storage = RocksDbKCVStorage::open(&path, key);
// match test_storage { // match test_storage {
// Err(e) => { // Err(e) => {
// log_debug!("storage error {}", e); // log_debug!("storage error {}", e);

@ -300,7 +300,7 @@ impl NoiseFSM {
if in_reply_to != 0 { if in_reply_to != 0 {
msg.set_id(in_reply_to); msg.set_id(in_reply_to);
} }
log_debug!("SENDING: {:?}", msg); log_info!("SENDING: {:?}", msg);
if self.noise_cipher_state_enc.is_some() { if self.noise_cipher_state_enc.is_some() {
let cipher = self.encrypt(msg)?; let cipher = self.encrypt(msg)?;
self.sender self.sender

@ -58,4 +58,6 @@ pub trait ServerStorage: Send + Sync {
topic: &TopicId, topic: &TopicId,
publisher: Option<&PublisherAdvert>, publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError>; ) -> Result<TopicSubRes, ServerError>;
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError>;
} }

@ -2924,6 +2924,39 @@ impl BlockGet {
} }
} }
/// Request a Commit by ID
///
/// commit_header_key is always set to None in the reply when request is made on OuterOverlay of protected or Group overlays
/// The difference with BlockGet is that the Broker will try to return all the commit blocks as they were sent in the Pub/Sub Event, if it has it.
/// This will help in having all the blocks (including the header and body blocks), while a BlockGet would inevitably return only the blocks of the ObjectContent,
/// and not the header nor the body. And the load() would fail with CommitLoadError::MissingBlocks. That's what happens when the Commit is not present in the pubsub,
/// and we need to default to using BlockGet instead.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommitGetV0 {
/// Block IDs to request
pub id: ObjectId,
/// Topic the commit is referenced from, if it is known by the requester.
/// can be used to do a BlockSearchTopic in the core overlay.
pub topic: Option<TopicId>,
#[serde(skip)]
pub overlay: Option<OverlayId>,
}
/// Request a Commit by ID (see [CommitGetV0] for more details)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum CommitGet {
V0(CommitGetV0),
}
impl CommitGet {
pub fn id(&self) -> &ObjectId {
match self {
CommitGet::V0(o) => &o.id,
}
}
}
/// Request to store one or more blocks /// Request to store one or more blocks
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BlocksPutV0 { pub struct BlocksPutV0 {
@ -3052,6 +3085,7 @@ pub enum ClientRequestContentV0 {
BlocksExist(BlocksExist), BlocksExist(BlocksExist),
BlockGet(BlockGet), BlockGet(BlockGet),
CommitGet(CommitGet),
TopicSyncReq(TopicSyncReq), TopicSyncReq(TopicSyncReq),
// For Pinned Repos only : // For Pinned Repos only :
@ -3071,6 +3105,7 @@ impl ClientRequestContentV0 {
ClientRequestContentV0::TopicSub(a) => a.set_overlay(overlay), ClientRequestContentV0::TopicSub(a) => a.set_overlay(overlay),
ClientRequestContentV0::PinRepo(a) => {} ClientRequestContentV0::PinRepo(a) => {}
ClientRequestContentV0::PublishEvent(a) => a.set_overlay(overlay), ClientRequestContentV0::PublishEvent(a) => a.set_overlay(overlay),
ClientRequestContentV0::CommitGet(a) => a.set_overlay(overlay),
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -3117,6 +3152,7 @@ impl ClientRequest {
ClientRequestContentV0::PinRepo(r) => r.get_actor(self.id()), ClientRequestContentV0::PinRepo(r) => r.get_actor(self.id()),
ClientRequestContentV0::TopicSub(r) => r.get_actor(self.id()), ClientRequestContentV0::TopicSub(r) => r.get_actor(self.id()),
ClientRequestContentV0::PublishEvent(r) => r.get_actor(self.id()), ClientRequestContentV0::PublishEvent(r) => r.get_actor(self.id()),
ClientRequestContentV0::CommitGet(r) => r.get_actor(self.id()),
_ => unimplemented!(), _ => unimplemented!(),
}, },
} }
@ -3251,6 +3287,14 @@ pub struct ClientResponseV0 {
pub content: ClientResponseContentV0, pub content: ClientResponseContentV0,
} }
impl ClientResponse {
pub fn set_result(&mut self, res: u16) {
match self {
Self::V0(v0) => v0.result = res,
}
}
}
/// Response to a `ClientRequest` /// Response to a `ClientRequest`
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ClientResponse { pub enum ClientResponse {
@ -3280,7 +3324,7 @@ where
} }
impl From<()> for ProtocolMessage { impl From<()> for ProtocolMessage {
fn from(msg: ()) -> ProtocolMessage { fn from(_msg: ()) -> ProtocolMessage {
let cm: ClientResponse = ServerError::Ok.into(); let cm: ClientResponse = ServerError::Ok.into();
cm.into() cm.into()
} }
@ -3327,7 +3371,8 @@ impl TryFrom<ProtocolMessage> for ClientResponseContentV0 {
.. ..
})) = msg })) = msg
{ {
if res == 0 { let err = ServerError::try_from(res).unwrap();
if !err.is_err() {
Ok(content) Ok(content)
} else { } else {
Err(ProtocolError::ServerError) Err(ProtocolError::ServerError)
@ -3771,6 +3816,16 @@ impl ProtocolMessage {
} }
} }
impl From<ClientResponseContentV0> for ClientResponse {
fn from(msg: ClientResponseContentV0) -> ClientResponse {
ClientResponse::V0(ClientResponseV0 {
id: 0,
result: 0,
content: msg,
})
}
}
impl From<ClientResponseContentV0> for ProtocolMessage { impl From<ClientResponseContentV0> for ProtocolMessage {
fn from(msg: ClientResponseContentV0) -> ProtocolMessage { fn from(msg: ClientResponseContentV0) -> ProtocolMessage {
let client_res = ClientResponse::V0(ClientResponseV0 { let client_res = ClientResponse::V0(ClientResponseV0 {

@ -43,6 +43,10 @@ impl BlockV0 {
b b
} }
pub fn dummy() -> BlockV0 {
BlockV0::new(vec![], None, vec![], None)
}
pub fn new_random_access( pub fn new_random_access(
children: Vec<BlockId>, children: Vec<BlockId>,
content: Vec<u8>, content: Vec<u8>,
@ -115,6 +119,10 @@ impl Block {
Block::V0(BlockV0::new(children, header_ref, content, key)) Block::V0(BlockV0::new(children, header_ref, content, key))
} }
pub fn dummy() -> Block {
Block::V0(BlockV0::dummy())
}
pub fn new_random_access( pub fn new_random_access(
children: Vec<BlockId>, children: Vec<BlockId>,
content: Vec<u8>, content: Vec<u8>,

@ -25,6 +25,11 @@ pub trait BlockStorage: Send + Sync {
/// Load a block from the storage. /// Load a block from the storage.
fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError>; fn get(&self, overlay: &OverlayId, id: &BlockId) -> Result<Block, StorageError>;
// fetch a block from broker or core overlay
// pub async fn fetch(&self, id: &BlockId) -> Result<Block, StorageError> {
// todo!();
// }
/// Save a block to the storage. /// Save a block to the storage.
fn put(&self, overlay: &OverlayId, block: &Block) -> Result<BlockId, StorageError>; fn put(&self, overlay: &OverlayId, block: &Block) -> Result<BlockId, StorageError>;

@ -125,7 +125,7 @@ impl Branch {
//log_debug!(" target_heads: {:?}", target_heads); //log_debug!(" target_heads: {:?}", target_heads);
//log_debug!(" known_heads: {:?}", known_heads); //log_debug!(" known_heads: {:?}", known_heads);
/// Load causal past of a Commit `cobj` in a `Branch` from the `BlockStorage`, /// Load causal past of a Commit `cobj` in a `Branch` from the `Store`,
/// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG. /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG.
/// optionally collecting the missing objects/blocks that couldn't be found locally on the way /// optionally collecting the missing objects/blocks that couldn't be found locally on the way
fn load_causal_past( fn load_causal_past(

@ -30,13 +30,12 @@ use std::iter::FromIterator;
pub enum CommitLoadError { pub enum CommitLoadError {
MissingBlocks(Vec<BlockId>), MissingBlocks(Vec<BlockId>),
ObjectParseError, ObjectParseError,
NotACommitError, NotACommit,
NotACommitBodyError, NotACommitBody,
CannotBeAtRootOfBranch, CannotBeAtRootOfBranch,
MustBeAtRootOfBranch, MustBeAtRootOfBranch,
SingletonCannotHaveHeader, SingletonCannotHaveHeader,
BodyLoadError(Vec<BlockId>), MalformedHeader,
HeaderLoadError,
BodyTypeMismatch, BodyTypeMismatch,
} }
@ -45,7 +44,6 @@ pub enum CommitVerifyError {
InvalidSignature, InvalidSignature,
InvalidHeader, InvalidHeader,
PermissionDenied, PermissionDenied,
DepLoadError(CommitLoadError),
} }
impl CommitV0 { impl CommitV0 {
@ -308,13 +306,36 @@ impl Commit {
) -> Result<Commit, CommitLoadError> { ) -> Result<Commit, CommitLoadError> {
let (id, key) = (commit_ref.id, commit_ref.key); let (id, key) = (commit_ref.id, commit_ref.key);
match Object::load(id, Some(key.clone()), store) { match Object::load(id, Some(key.clone()), store) {
Err(ObjectParseError::MissingHeaderBlocks((obj, mut missing))) => {
if with_body {
let content = obj
.content()
.map_err(|_e| CommitLoadError::ObjectParseError)?;
let mut commit = match content {
ObjectContent::V0(ObjectContentV0::Commit(c)) => c,
_ => return Err(CommitLoadError::NotACommit),
};
commit.set_id(id);
commit.set_key(key.clone());
match commit.load_body(store) {
Ok(_) => return Err(CommitLoadError::MissingBlocks(missing)),
Err(CommitLoadError::MissingBlocks(mut missing_body)) => {
missing.append(&mut missing_body);
return Err(CommitLoadError::MissingBlocks(missing));
}
Err(e) => return Err(e),
}
} else {
return Err(CommitLoadError::MissingBlocks(missing));
}
}
Ok(obj) => { Ok(obj) => {
let content = obj let content = obj
.content() .content()
.map_err(|_e| CommitLoadError::ObjectParseError)?; .map_err(|_e| CommitLoadError::ObjectParseError)?;
let mut commit = match content { let mut commit = match content {
ObjectContent::V0(ObjectContentV0::Commit(c)) => c, ObjectContent::V0(ObjectContentV0::Commit(c)) => c,
_ => return Err(CommitLoadError::NotACommitError), _ => return Err(CommitLoadError::NotACommit),
}; };
commit.set_id(id); commit.set_id(id);
commit.set_key(key.clone()); commit.set_key(key.clone());
@ -341,7 +362,7 @@ impl Commit {
let content = self.content_v0(); let content = self.content_v0();
let (id, key) = (content.body.id, content.body.key.clone()); let (id, key) = (content.body.id, content.body.key.clone());
let obj = Object::load(id.clone(), Some(key.clone()), store).map_err(|e| match e { let obj = Object::load(id.clone(), Some(key.clone()), store).map_err(|e| match e {
ObjectParseError::MissingBlocks(missing) => CommitLoadError::BodyLoadError(missing), ObjectParseError::MissingBlocks(missing) => CommitLoadError::MissingBlocks(missing),
_ => CommitLoadError::ObjectParseError, _ => CommitLoadError::ObjectParseError,
})?; })?;
let content = obj let content = obj
@ -352,7 +373,7 @@ impl Commit {
self.set_body(body); self.set_body(body);
Ok(self.body().unwrap()) Ok(self.body().unwrap())
} }
_ => Err(CommitLoadError::NotACommitBodyError), _ => Err(CommitLoadError::NotACommitBody),
} }
} }
@ -448,7 +469,7 @@ impl Commit {
// load deps (the previous RootBranch commit) // load deps (the previous RootBranch commit)
let deps = self.deps(); let deps = self.deps();
if deps.len() != 1 { if deps.len() != 1 {
Err(CommitLoadError::HeaderLoadError) Err(CommitLoadError::MalformedHeader)
} else { } else {
let previous_rootbranch_commit = Commit::load(deps[0].clone(), store, true)?; let previous_rootbranch_commit = Commit::load(deps[0].clone(), store, true)?;
let previous_rootbranch = previous_rootbranch_commit let previous_rootbranch = previous_rootbranch_commit
@ -472,7 +493,7 @@ impl Commit {
return Ok(false); return Ok(false);
} }
} }
Err(CommitLoadError::HeaderLoadError) Err(CommitLoadError::MalformedHeader)
} }
CommitBody::V0(CommitBodyV0::Delete(_)) => Ok(true), CommitBody::V0(CommitBodyV0::Delete(_)) => Ok(true),
_ => Ok(false), _ => Ok(false),
@ -623,7 +644,7 @@ impl Commit {
) -> Result<Vec<ObjectId>, CommitLoadError> { ) -> Result<Vec<ObjectId>, CommitLoadError> {
//log_debug!(">> verify_full_object_refs_of_branch_at_commit: #{}", self.seq()); //log_debug!(">> verify_full_object_refs_of_branch_at_commit: #{}", self.seq());
/// Load `Commit`s of a `Branch` from the `BlockStorage` starting from the given `Commit`, /// Load `Commit`s of a `Branch` from the `Store` starting from the given `Commit`,
/// and collect missing `ObjectId`s /// and collect missing `ObjectId`s
fn load_direct_object_refs( fn load_direct_object_refs(
commit: &Commit, commit: &Commit,
@ -658,7 +679,7 @@ impl Commit {
Err(CommitLoadError::MissingBlocks(m)) => { Err(CommitLoadError::MissingBlocks(m)) => {
// The commit body is missing. // The commit body is missing.
missing.extend(m.clone()); missing.extend(m.clone());
Err(CommitLoadError::BodyLoadError(m)) Err(CommitLoadError::MissingBlocks(m))
} }
Err(e) => Err(e), Err(e) => Err(e),
}?; }?;
@ -1551,14 +1572,11 @@ mod test {
let obj = Object::new(content.clone(), None, max_object_size, &store); let obj = Object::new(content.clone(), None, max_object_size, &store);
let hashmap_storage = HashMapBlockStorage::new();
let storage = Box::new(hashmap_storage);
_ = obj.save(&store).expect("save object"); _ = obj.save(&store).expect("save object");
let commit = Commit::load(obj.reference().unwrap(), &store, false); let commit = Commit::load(obj.reference().unwrap(), &store, false);
assert_eq!(commit, Err(CommitLoadError::NotACommitError)); assert_eq!(commit, Err(CommitLoadError::NotACommit));
} }
#[test] #[test]
@ -1649,7 +1667,7 @@ mod test {
// match commit.load_body(repo.store.unwrap()) { // match commit.load_body(repo.store.unwrap()) {
// Ok(_b) => panic!("Body should not exist"), // Ok(_b) => panic!("Body should not exist"),
// Err(CommitLoadError::BodyLoadError(missing)) => { // Err(CommitLoadError::MissingBlocks(missing)) => {
// assert_eq!(missing.len(), 1); // assert_eq!(missing.len(), 1);
// } // }
// Err(e) => panic!("Commit load error: {:?}", e), // Err(e) => panic!("Commit load error: {:?}", e),
@ -1658,7 +1676,7 @@ mod test {
commit.verify_sig(&repo).expect("verify signature"); commit.verify_sig(&repo).expect("verify signature");
match commit.verify_perm(&repo) { match commit.verify_perm(&repo) {
Ok(_) => panic!("Commit should not be Ok"), Ok(_) => panic!("Commit should not be Ok"),
Err(NgError::CommitLoadError(CommitLoadError::BodyLoadError(missing))) => { Err(NgError::CommitLoadError(CommitLoadError::MissingBlocks(missing))) => {
assert_eq!(missing.len(), 1); assert_eq!(missing.len(), 1);
} }
Err(e) => panic!("Commit verify perm error: {:?}", e), Err(e) => panic!("Commit verify perm error: {:?}", e),
@ -1666,7 +1684,7 @@ mod test {
// match commit.verify_full_object_refs_of_branch_at_commit(repo.store.unwrap()) { // match commit.verify_full_object_refs_of_branch_at_commit(repo.store.unwrap()) {
// Ok(_) => panic!("Commit should not be Ok"), // Ok(_) => panic!("Commit should not be Ok"),
// Err(CommitLoadError::BodyLoadError(missing)) => { // Err(CommitLoadError::MissingBlocks(missing)) => {
// assert_eq!(missing.len(), 1); // assert_eq!(missing.len(), 1);
// } // }
// Err(e) => panic!("Commit verify error: {:?}", e), // Err(e) => panic!("Commit verify error: {:?}", e),
@ -1674,7 +1692,7 @@ mod test {
match commit.verify(&repo) { match commit.verify(&repo) {
Ok(_) => panic!("Commit should not be Ok"), Ok(_) => panic!("Commit should not be Ok"),
Err(NgError::CommitLoadError(CommitLoadError::BodyLoadError(missing))) => { Err(NgError::CommitLoadError(CommitLoadError::MissingBlocks(missing))) => {
assert_eq!(missing.len(), 1); assert_eq!(missing.len(), 1);
} }
Err(e) => panic!("Commit verify error: {:?}", e), Err(e) => panic!("Commit verify error: {:?}", e),

@ -10,6 +10,7 @@
//! Errors //! Errors
pub use crate::commit::{CommitLoadError, CommitVerifyError}; pub use crate::commit::{CommitLoadError, CommitVerifyError};
use crate::object::Object;
use num_enum::IntoPrimitive; use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive; use num_enum::TryFromPrimitive;
@ -35,8 +36,10 @@ pub enum NgError {
PermissionDenied, PermissionDenied,
InvalidPazzle, InvalidPazzle,
CommitLoadError(CommitLoadError), CommitLoadError(CommitLoadError),
ObjectParseError(ObjectParseError),
StorageError(StorageError), StorageError(StorageError),
NotFound, NotFound,
JsStorageKeyNotFound,
IoError, IoError,
CommitVerifyError(CommitVerifyError), CommitVerifyError(CommitVerifyError),
LocalBrokerNotInitialized, LocalBrokerNotInitialized,
@ -62,6 +65,7 @@ pub enum NgError {
InvalidResponse, InvalidResponse,
NotAServerError, NotAServerError,
VerifierError(VerifierError), VerifierError(VerifierError),
SiteNotFoundOnBroker,
} }
impl Error for NgError {} impl Error for NgError {}
@ -70,6 +74,7 @@ impl fmt::Display for NgError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Self::WalletError(string) => write!(f, "WalletError: {}", string), Self::WalletError(string) => write!(f, "WalletError: {}", string),
Self::JsStorageWriteError(string) => write!(f, "JsStorageWriteError: {}", string),
_ => write!(f, "{:?}", self), _ => write!(f, "{:?}", self),
} }
} }
@ -149,7 +154,18 @@ impl From<StorageError> for NgError {
impl From<VerifierError> for NgError { impl From<VerifierError> for NgError {
fn from(e: VerifierError) -> Self { fn from(e: VerifierError) -> Self {
NgError::VerifierError(e) match e {
VerifierError::InvalidKey => NgError::InvalidKey,
VerifierError::SerializationError => NgError::SerializationError,
VerifierError::CommitLoadError(e) => NgError::CommitLoadError(e),
VerifierError::StorageError(e) => NgError::StorageError(e),
VerifierError::ObjectParseError(e) => NgError::ObjectParseError(e),
VerifierError::TopicNotFound => NgError::TopicNotFound,
VerifierError::RepoNotFound => NgError::RepoNotFound,
VerifierError::StoreNotFound => NgError::StoreNotFound,
VerifierError::BranchNotFound => NgError::BranchNotFound,
_ => NgError::VerifierError(e),
}
} }
} }
@ -172,6 +188,8 @@ pub enum ObjectParseError {
BlockDeserializeError, BlockDeserializeError,
/// Error deserializing content of the object /// Error deserializing content of the object
ObjectDeserializeError, ObjectDeserializeError,
MissingHeaderBlocks((Object, Vec<BlockId>)),
} }
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
@ -211,6 +229,8 @@ pub enum ServerError {
SequenceMismatch, SequenceMismatch,
FileError, FileError,
RepoAlreadyOpened, RepoAlreadyOpened,
NotFound,
EmptyStream,
} }
impl ServerError { impl ServerError {
@ -218,7 +238,7 @@ impl ServerError {
*self == ServerError::PartialContent || *self == ServerError::EndOfStream *self == ServerError::PartialContent || *self == ServerError::EndOfStream
} }
pub fn is_err(&self) -> bool { pub fn is_err(&self) -> bool {
*self != ServerError::Ok *self != ServerError::Ok && !self.is_stream()
} }
} }
@ -228,6 +248,7 @@ pub enum VerifierError {
MissingCommitInDag, MissingCommitInDag,
CommitBodyNotFound, CommitBodyNotFound,
InvalidKey, InvalidKey,
SerializationError,
OtherError(String), OtherError(String),
CommitLoadError(CommitLoadError), CommitLoadError(CommitLoadError),
InvalidRepositoryCommit, InvalidRepositoryCommit,
@ -240,6 +261,8 @@ pub enum VerifierError {
MalformedSyncSignatureDeps, MalformedSyncSignatureDeps,
TopicNotFound, TopicNotFound,
RepoNotFound, RepoNotFound,
StoreNotFound,
BranchNotFound,
InvalidBranch, InvalidBranch,
NoBlockStorageAvailable, NoBlockStorageAvailable,
RootBranchNotFound, RootBranchNotFound,
@ -250,6 +273,12 @@ impl From<NgError> for VerifierError {
match e { match e {
NgError::InvalidKey => VerifierError::InvalidKey, NgError::InvalidKey => VerifierError::InvalidKey,
NgError::RepoNotFound => VerifierError::RepoNotFound, NgError::RepoNotFound => VerifierError::RepoNotFound,
NgError::BranchNotFound => VerifierError::BranchNotFound,
NgError::SerializationError => VerifierError::SerializationError,
// NgError::JsStorageReadError
// NgError::JsStorageWriteError(String)
// NgError::JsStorageKeyNotFound
// NgError::InvalidFileFormat
_ => VerifierError::OtherError(e.to_string()), _ => VerifierError::OtherError(e.to_string()),
} }
} }

@ -213,7 +213,7 @@ impl EventV0 {
// first_id = Some(id) // first_id = Some(id)
// } // }
// } // }
// first_id.ok_or(NgError::CommitLoadError(CommitLoadError::NotACommitError)) // first_id.ok_or(NgError::CommitLoadError(CommitLoadError::NotACommit))
// } // }
/// opens an event with the key derived from information kept in Repo. /// opens an event with the key derived from information kept in Repo.

@ -40,7 +40,7 @@ pub const DATA_VARINT_EXTRA: usize = 4;
pub const BLOCK_MAX_DATA_EXTRA: usize = 4; pub const BLOCK_MAX_DATA_EXTRA: usize = 4;
#[derive(Debug)] #[derive(Debug, PartialEq, Eq, Clone)]
/// An Object in memory. This is not used to serialize data /// An Object in memory. This is not used to serialize data
pub struct Object { pub struct Object {
/// keeps the deduplicated blocks of the Object /// keeps the deduplicated blocks of the Object
@ -491,13 +491,29 @@ impl Object {
panic!("shouldn't happen") panic!("shouldn't happen")
} }
CommitHeaderObject::Id(id) => { CommitHeaderObject::Id(id) => {
let obj = Object::load(id, Some(header_ref.key.clone()), store)?; let obj_res = Object::load(id, Some(header_ref.key.clone()), store);
match obj.content()? { match obj_res {
Err(ObjectParseError::MissingBlocks(m)) => {
return Err(ObjectParseError::MissingHeaderBlocks((
Object {
blocks,
block_contents,
header: None,
header_blocks: vec![],
#[cfg(test)]
already_saved: false,
},
m,
)));
}
Err(e) => return Err(e),
Ok(obj) => match obj.content()? {
ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => { ObjectContent::V0(ObjectContentV0::CommitHeader(mut commit_header)) => {
commit_header.set_id(id); commit_header.set_id(id);
(Some(commit_header), Some(obj.blocks().cloned().collect())) (Some(commit_header), Some(obj.blocks().cloned().collect()))
} }
_ => return Err(ObjectParseError::InvalidHeader), _ => return Err(ObjectParseError::InvalidHeader),
},
} }
} }
CommitHeaderObject::EncryptedContent(content) => { CommitHeaderObject::EncryptedContent(content) => {

@ -23,7 +23,7 @@ use rocksdb::{
}; };
pub struct RocksdbTransaction<'a> { pub struct RocksdbTransaction<'a> {
store: &'a RocksdbKCVStorage, store: &'a RocksDbKCVStorage,
tx: Option<rocksdb::Transaction<'a, TransactionDB>>, tx: Option<rocksdb::Transaction<'a, TransactionDB>>,
} }
@ -64,7 +64,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> {
family: &Option<String>, family: &Option<String>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> { ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
let property_start = let property_start =
RocksdbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix); RocksDbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix);
let iter = self.get_iterator(&property_start, &family)?; let iter = self.get_iterator(&property_start, &family)?;
self.store self.store
.get_all_keys_and_values_(prefix, key_size, key_prefix, suffix, iter) .get_all_keys_and_values_(prefix, key_size, key_prefix, suffix, iter)
@ -79,7 +79,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> {
) -> Result<HashMap<u8, Vec<u8>>, StorageError> { ) -> Result<HashMap<u8, Vec<u8>>, StorageError> {
let key_size = key.len(); let key_size = key.len();
let prop_values = self.get_all_keys_and_values(prefix, key_size, key, None, family)?; let prop_values = self.get_all_keys_and_values(prefix, key_size, key, None, family)?;
Ok(RocksdbKCVStorage::get_all_properties_of_key( Ok(RocksDbKCVStorage::get_all_properties_of_key(
prop_values, prop_values,
key_size, key_size,
&properties, &properties,
@ -94,7 +94,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> {
suffix: Option<u8>, suffix: Option<u8>,
family: &Option<String>, family: &Option<String>,
) -> Result<Vec<u8>, StorageError> { ) -> Result<Vec<u8>, StorageError> {
let property = RocksdbKCVStorage::compute_property(prefix, key, &suffix); let property = RocksDbKCVStorage::compute_property(prefix, key, &suffix);
let res = match family { let res = match family {
Some(cf) => self.tx().get_for_update_cf( Some(cf) => self.tx().get_for_update_cf(
self.store self.store
@ -152,7 +152,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> {
value: &Vec<u8>, value: &Vec<u8>,
family: &Option<String>, family: &Option<String>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let property = RocksdbKCVStorage::compute_property(prefix, key, &suffix); let property = RocksDbKCVStorage::compute_property(prefix, key, &suffix);
match family { match family {
Some(cf) => self.tx().put_cf( Some(cf) => self.tx().put_cf(
self.store self.store
@ -189,7 +189,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> {
suffix: Option<u8>, suffix: Option<u8>,
family: &Option<String>, family: &Option<String>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let property = RocksdbKCVStorage::compute_property(prefix, key, &suffix); let property = RocksDbKCVStorage::compute_property(prefix, key, &suffix);
let res = match family { let res = match family {
Some(cf) => self.tx().delete_cf( Some(cf) => self.tx().delete_cf(
self.store self.store
@ -253,7 +253,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> {
family: &Option<String>, family: &Option<String>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let key_size = key.len() + property_size; let key_size = key.len() + property_size;
let property_start = RocksdbKCVStorage::calc_key_start(prefix, key_size, &key, &suffix); let property_start = RocksDbKCVStorage::calc_key_start(prefix, key_size, &key, &suffix);
let mut iter = self.get_iterator(&property_start, &family)?; let mut iter = self.get_iterator(&property_start, &family)?;
let mut vec_key_end = key.clone(); let mut vec_key_end = key.clone();
@ -261,7 +261,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> {
vec_key_end.append(&mut trailing_max); vec_key_end.append(&mut trailing_max);
// let property_start = Self::compute_property(prefix, &vec_key_start, suffix); // let property_start = Self::compute_property(prefix, &vec_key_start, suffix);
let property_end = RocksdbKCVStorage::compute_property( let property_end = RocksDbKCVStorage::compute_property(
prefix, prefix,
&vec_key_end, &vec_key_end,
&Some(suffix.unwrap_or(255u8)), &Some(suffix.unwrap_or(255u8)),
@ -299,7 +299,7 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> {
} }
} }
pub struct RocksdbKCVStorage { pub struct RocksDbKCVStorage {
/// the main store where all the properties of keys are stored /// the main store where all the properties of keys are stored
db: TransactionDB, db: TransactionDB,
/// path for the storage backend data /// path for the storage backend data
@ -320,7 +320,7 @@ fn compare<T: Ord>(a: &[T], b: &[T]) -> std::cmp::Ordering {
return a.len().cmp(&b.len()); return a.len().cmp(&b.len());
} }
impl ReadTransaction for RocksdbKCVStorage { impl ReadTransaction for RocksDbKCVStorage {
/// returns a list of (key,value) that are in the range specified in the request /// returns a list of (key,value) that are in the range specified in the request
fn get_all_keys_and_values( fn get_all_keys_and_values(
&self, &self,
@ -407,7 +407,7 @@ impl ReadTransaction for RocksdbKCVStorage {
} }
} }
impl KCVStorage for RocksdbKCVStorage { impl KCVStorage for RocksDbKCVStorage {
fn write_transaction( fn write_transaction(
&self, &self,
method: &mut dyn FnMut(&mut dyn WriteTransaction) -> Result<(), StorageError>, method: &mut dyn FnMut(&mut dyn WriteTransaction) -> Result<(), StorageError>,
@ -427,7 +427,7 @@ impl KCVStorage for RocksdbKCVStorage {
} }
} }
impl WriteTransaction for RocksdbKCVStorage { impl WriteTransaction for RocksDbKCVStorage {
/// Save a property value to the store. /// Save a property value to the store.
fn put( fn put(
&self, &self,
@ -508,7 +508,7 @@ impl WriteTransaction for RocksdbKCVStorage {
} }
} }
impl RocksdbKCVStorage { impl RocksDbKCVStorage {
pub fn path(&self) -> PathBuf { pub fn path(&self) -> PathBuf {
PathBuf::from(&self.path) PathBuf::from(&self.path)
} }
@ -652,7 +652,7 @@ impl RocksdbKCVStorage {
/// Opens the store and returns a KCVStorage object that should be kept and used to manipulate the properties /// Opens the store and returns a KCVStorage object that should be kept and used to manipulate the properties
/// The key is the encryption key for the data at rest. /// The key is the encryption key for the data at rest.
pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result<RocksdbKCVStorage, StorageError> { pub fn open<'a>(path: &Path, key: [u8; 32]) -> Result<RocksDbKCVStorage, StorageError> {
let mut opts = Options::default(); let mut opts = Options::default();
//opts.set_use_fsync(true); //opts.set_use_fsync(true);
opts.create_if_missing(true); opts.create_if_missing(true);
@ -668,7 +668,7 @@ impl RocksdbKCVStorage {
Env::version() Env::version()
); );
Ok(RocksdbKCVStorage { Ok(RocksDbKCVStorage {
db: db, db: db,
path: path.to_str().unwrap().to_string(), path: path.to_str().unwrap().to_string(),
}) })

@ -34,6 +34,7 @@ threshold_crypto = "0.4.0"
rand = { version = "0.7", features = ["getrandom"] } rand = { version = "0.7", features = ["getrandom"] }
web-time = "0.2.0" web-time = "0.2.0"
either = "1.8.1" either = "1.8.1"
futures = "0.3.24"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" } ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" }

@ -17,7 +17,7 @@ use ng_repo::block_storage::BlockStorage;
use ng_repo::repo::{BranchInfo, Repo}; use ng_repo::repo::{BranchInfo, Repo};
use ng_repo::store::Store; use ng_repo::store::Store;
use ng_repo::{errors::StorageError, types::*}; use ng_repo::{errors::StorageError, types::*};
use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::{ use std::{
@ -27,13 +27,13 @@ use std::{
}; };
pub(crate) struct RocksDbUserStorage { pub(crate) struct RocksDbUserStorage {
user_storage: RocksdbKCVStorage, user_storage: RocksDbKCVStorage,
} }
impl RocksDbUserStorage { impl RocksDbUserStorage {
pub fn open(path: &PathBuf, master_key: [u8; 32]) -> Result<Self, StorageError> { pub fn open(path: &PathBuf, master_key: [u8; 32]) -> Result<Self, StorageError> {
Ok(RocksDbUserStorage { Ok(RocksDbUserStorage {
user_storage: RocksdbKCVStorage::open(path, master_key)?, user_storage: RocksDbKCVStorage::open(path, master_key)?,
}) })
} }
} }

@ -105,6 +105,7 @@ impl fmt::Debug for JsSaveSessionConfig {
pub enum VerifierConfigType { pub enum VerifierConfigType {
/// nothing will be saved on disk during the session /// nothing will be saved on disk during the session
Memory, Memory,
/// only the session information is saved locally. the UserStorage is not saved.
JsSaveSession(JsSaveSessionConfig), JsSaveSession(JsSaveSessionConfig),
/// will save all user data locally, with RocksDb backend /// will save all user data locally, with RocksDb backend
RocksDb(PathBuf), RocksDb(PathBuf),
@ -130,6 +131,13 @@ impl VerifierConfigType {
_ => false, _ => false,
} }
} }
pub(crate) fn is_in_memory(&self) -> bool {
match self {
Self::Memory | Self::JsSaveSession(_) => true,
_ => false,
}
}
} }
#[derive(Debug)] #[derive(Debug)]

@ -10,6 +10,7 @@
//! Repo object (on heap) to handle a Repository //! Repo object (on heap) to handle a Repository
use crate::commits::*; use crate::commits::*;
use crate::types::*; use crate::types::*;
use async_std::stream::StreamExt;
use ng_net::actor::SoS; use ng_net::actor::SoS;
use ng_net::broker::{Broker, BROKER}; use ng_net::broker::{Broker, BROKER};
use ng_repo::log::*; use ng_repo::log::*;
@ -35,7 +36,7 @@ use core::fmt;
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use crate::rocksdb_user_storage::RocksDbUserStorage; use crate::rocksdb_user_storage::RocksDbUserStorage;
use crate::user_storage::{InMemoryUserStorage, UserStorage}; use crate::user_storage::{InMemoryUserStorage, UserStorage};
use async_std::sync::{Mutex, RwLockWriteGuard}; use async_std::sync::{Mutex, RwLockReadGuard, RwLockWriteGuard};
use std::{collections::HashMap, path::PathBuf, sync::Arc}; use std::{collections::HashMap, path::PathBuf, sync::Arc};
use ng_net::{ use ng_net::{
@ -169,8 +170,25 @@ impl Verifier {
self.config.config_type.is_persistent() self.config.config_type.is_persistent()
} }
fn is_in_memory(&self) -> bool {
self.config.config_type.is_in_memory()
}
fn get_arc_block_storage(
&self,
) -> Result<Arc<std::sync::RwLock<dyn BlockStorage + Send + Sync>>, VerifierError> {
Ok(Arc::clone(
self.block_storage
.as_ref()
.ok_or(VerifierError::NoBlockStorageAvailable)?,
))
}
pub fn get_store_or_load(&mut self, store_repo: &StoreRepo) -> Arc<Store> { pub fn get_store_or_load(&mut self, store_repo: &StoreRepo) -> Arc<Store> {
let overlay_id = store_repo.overlay_id_for_storage_purpose(); let overlay_id = store_repo.overlay_id_for_storage_purpose();
let block_storage = self
.get_arc_block_storage()
.expect("get_store_or_load cannot be called on Remote Verifier");
let store = self.stores.entry(overlay_id).or_insert_with(|| { let store = self.stores.entry(overlay_id).or_insert_with(|| {
// FIXME: get store_readcap and store_overlay_branch_readcap from user storage // FIXME: get store_readcap and store_overlay_branch_readcap from user storage
let store_readcap = ReadCap::nil(); let store_readcap = ReadCap::nil();
@ -179,13 +197,7 @@ impl Verifier {
*store_repo, *store_repo,
store_readcap, store_readcap,
store_overlay_branch_readcap, store_overlay_branch_readcap,
Arc::clone( block_storage,
&self
.block_storage
.as_ref()
.ok_or(core::fmt::Error)
.expect("get_store_or_load cannot be called on Remote Verifier"),
),
); );
Arc::new(store) Arc::new(store)
}); });
@ -260,9 +272,12 @@ impl Verifier {
Ok(()) Ok(())
} }
pub fn get_store(&self, store_repo: &StoreRepo) -> Result<Arc<Store>, NgError> { pub fn get_store(&self, store_repo: &StoreRepo) -> Result<Arc<Store>, VerifierError> {
let overlay_id = store_repo.overlay_id_for_storage_purpose(); let overlay_id = store_repo.overlay_id_for_storage_purpose();
let store = self.stores.get(&overlay_id).ok_or(NgError::StoreNotFound)?; let store = self
.stores
.get(&overlay_id)
.ok_or(VerifierError::StoreNotFound)?;
Ok(Arc::clone(store)) Ok(Arc::clone(store))
} }
@ -303,12 +318,15 @@ impl Verifier {
repo_id: &RepoId, repo_id: &RepoId,
branch_id: &BranchId, branch_id: &BranchId,
current_heads: Vec<ObjectRef>, current_heads: Vec<ObjectRef>,
) -> Result<(), NgError> { ) -> Result<(), VerifierError> {
let repo = self.repos.get_mut(repo_id).ok_or(NgError::RepoNotFound)?; let repo = self
.repos
.get_mut(repo_id)
.ok_or(VerifierError::RepoNotFound)?;
let branch = repo let branch = repo
.branches .branches
.get_mut(branch_id) .get_mut(branch_id)
.ok_or(NgError::BranchNotFound)?; .ok_or(VerifierError::BranchNotFound)?;
branch.current_heads = current_heads; branch.current_heads = current_heads;
Ok(()) Ok(())
} }
@ -498,7 +516,7 @@ impl Verifier {
if self.connected_server_id.is_some() { if self.connected_server_id.is_some() {
// send the event to the server already // send the event to the server already
let broker = BROKER.write().await; let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub(); let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.to_owned().unwrap(); let remote = self.connected_server_id.to_owned().unwrap();
self.send_event(event, &broker, &user, &remote, overlay) self.send_event(event, &broker, &user, &remote, overlay)
@ -546,7 +564,7 @@ impl Verifier {
async fn send_event<'a>( async fn send_event<'a>(
&mut self, &mut self,
event: Event, event: Event,
broker: &RwLockWriteGuard<'a, Broker<'a>>, broker: &RwLockReadGuard<'a, Broker<'a>>,
user: &UserId, user: &UserId,
remote: &DirectPeerId, remote: &DirectPeerId,
overlay: OverlayId, overlay: OverlayId,
@ -665,20 +683,26 @@ impl Verifier {
} }
} }
fn user_storage_if_persistent(&self) -> Option<Arc<Box<dyn UserStorage>>> {
if self.is_persistent() {
if let Some(us) = self.user_storage.as_ref() {
Some(Arc::clone(us))
} else {
None
}
} else {
None
}
}
pub(crate) fn add_branch_and_save( pub(crate) fn add_branch_and_save(
&mut self, &mut self,
repo_id: &RepoId, repo_id: &RepoId,
branch_info: BranchInfo, branch_info: BranchInfo,
store_repo: &StoreRepo, store_repo: &StoreRepo,
) -> Result<(), VerifierError> { ) -> Result<(), VerifierError> {
let user_storage = self if let Some(user_storage) = self.user_storage_if_persistent() {
.user_storage user_storage.add_branch(repo_id, &branch_info)?;
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
if user_storage.is_some() {
user_storage.unwrap().add_branch(repo_id, &branch_info)?;
} }
let branch_id = branch_info.id.clone(); let branch_id = branch_info.id.clone();
let topic_id = branch_info.topic.clone(); let topic_id = branch_info.topic.clone();
@ -702,56 +726,82 @@ impl Verifier {
branch_id: &BranchId, branch_id: &BranchId,
store_repo: &StoreRepo, store_repo: &StoreRepo,
) -> Result<(), VerifierError> { ) -> Result<(), VerifierError> {
let user_storage = self if let Some(user_storage) = self.user_storage_if_persistent() {
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
if user_storage.is_some() {
let repo = self.get_repo(repo_id, store_repo)?; let repo = self.get_repo(repo_id, store_repo)?;
user_storage user_storage.add_branch(repo_id, repo.branch(branch_id)?)?;
.unwrap()
.add_branch(repo_id, repo.branch(branch_id)?)?;
} }
Ok(()) Ok(())
} }
pub(crate) fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), VerifierError> { pub(crate) fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), VerifierError> {
let user_storage = self if let Some(user_storage) = self.user_storage_if_persistent() {
.user_storage user_storage.update_signer_cap(signer_cap)?;
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
if user_storage.is_some() {
user_storage.unwrap().update_signer_cap(signer_cap)?;
} }
Ok(()) Ok(())
} }
pub(crate) fn add_repo_and_save(&mut self, repo: Repo) -> &Repo { pub(crate) fn add_repo_and_save(&mut self, repo: Repo) -> &Repo {
let user_storage = self let us = self.user_storage_if_persistent();
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
let repo_ref: &Repo = self.add_repo_(repo); let repo_ref: &Repo = self.add_repo_(repo);
// save in user_storage // save in user_storage
if user_storage.is_some() { if let Some(user_storage) = us {
let _ = user_storage.unwrap().save_repo(repo_ref); let _ = user_storage.save_repo(repo_ref);
} }
repo_ref repo_ref
} }
pub(crate) fn get_repo(&self, id: &RepoId, store_repo: &StoreRepo) -> Result<&Repo, NgError> { pub(crate) fn get_repo(
&self,
id: &RepoId,
store_repo: &StoreRepo,
) -> Result<&Repo, VerifierError> {
//let store = self.get_store(store_repo); //let store = self.get_store(store_repo);
let repo_ref = self.repos.get(id).ok_or(NgError::RepoNotFound); let repo_ref = self.repos.get(id).ok_or(VerifierError::RepoNotFound);
repo_ref repo_ref
} }
pub async fn bootstrap(&mut self) -> Result<(), NgError> {
if self.is_in_memory() {
// TODO only bootstrap if 3P stores of personal site not already loaded (by replay)
let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.to_owned().unwrap();
let read_cap = self.config.private_store_read_cap.as_ref().unwrap();
// first we fetch the read_cap commit of private store repo.
let msg = CommitGet::V0(CommitGetV0 {
id: read_cap.id,
topic: None, // we dont have the topic (only available from RepoLink/BranchLink) but we are pretty sure the Broker has the commit anyway.
overlay: Some(OverlayId::outer(
self.config.private_store_id.as_ref().unwrap(),
)),
});
match broker
.request::<CommitGet, Block>(&user, &remote, msg)
.await
{
Err(NgError::ServerError(ServerError::NotFound)) => {
// TODO: fallback to BlockGet, then Commit::load(with_body:true), which will return an Err(CommitLoadError::MissingBlocks), then do another BlockGet with those, and then again Commit::load...
return Err(NgError::SiteNotFoundOnBroker);
}
Ok(SoS::Stream(mut blockstream)) => {
while let Some(block) = blockstream.next().await {
log_info!("GOT BLOCK {:?}", block);
}
Ok(())
}
Ok(_) => return Err(NgError::InvalidResponse),
Err(e) => return Err(e),
}
} else {
Ok(())
}
}
fn load_from_credentials_and_events( fn load_from_credentials_and_events(
&mut self, &mut self,
events: &Vec<EventOutboxStorage>, events: &Vec<EventOutboxStorage>,
) -> Result<(), NgError> { ) -> Result<(), VerifierError> {
let private_store_id = self.config.private_store_id.as_ref().unwrap(); let private_store_id = self.config.private_store_id.as_ref().unwrap();
let private_outer_overlay_id = OverlayId::outer(private_store_id); let private_outer_overlay_id = OverlayId::outer(private_store_id);
let private_inner_overlay_id = OverlayId::inner( let private_inner_overlay_id = OverlayId::inner(
@ -767,13 +817,7 @@ impl Verifier {
store_repo, store_repo,
self.config.private_store_read_cap.to_owned().unwrap(), self.config.private_store_read_cap.to_owned().unwrap(),
self.config.private_store_read_cap.to_owned().unwrap(), self.config.private_store_read_cap.to_owned().unwrap(),
Arc::clone( self.get_arc_block_storage()?,
&self
.block_storage
.as_ref()
.ok_or(core::fmt::Error)
.expect("couldn't get the block_storage"),
),
)); ));
let store = self let store = self
@ -946,7 +990,7 @@ impl Verifier {
pub async fn send_outbox(&mut self) -> Result<(), NgError> { pub async fn send_outbox(&mut self) -> Result<(), NgError> {
let events: Vec<EventOutboxStorage> = self.take_events_from_outbox()?; let events: Vec<EventOutboxStorage> = self.take_events_from_outbox()?;
let broker = BROKER.write().await; let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub(); let user = self.config.user_priv_key.to_pub();
let remote = self let remote = self
.connected_server_id .connected_server_id
@ -1191,14 +1235,7 @@ impl Verifier {
&mut self, &mut self,
update: &StoreUpdate, update: &StoreUpdate,
) -> Result<(), VerifierError> { ) -> Result<(), VerifierError> {
let store = Store::new_from( let store = Store::new_from(update, self.get_arc_block_storage()?);
update,
Arc::clone(
self.block_storage
.as_ref()
.ok_or(VerifierError::NoBlockStorageAvailable)?,
),
);
let overlay_id = store.get_store_repo().overlay_id_for_storage_purpose(); let overlay_id = store.get_store_repo().overlay_id_for_storage_purpose();
let store = self let store = self
.stores .stores
@ -1220,6 +1257,7 @@ impl Verifier {
true => SymKey::nil(), true => SymKey::nil(),
}; };
let overlay_id = store_repo.overlay_id_for_storage_purpose(); let overlay_id = store_repo.overlay_id_for_storage_purpose();
let block_storage = self.get_arc_block_storage()?;
let store = self.stores.entry(overlay_id).or_insert_with(|| { let store = self.stores.entry(overlay_id).or_insert_with(|| {
let store_readcap = ReadCap::nil(); let store_readcap = ReadCap::nil();
// temporarily set the store_overlay_branch_readcap to an objectRef that has an empty id, and a key = to the repo_write_cap_secret // temporarily set the store_overlay_branch_readcap to an objectRef that has an empty id, and a key = to the repo_write_cap_secret
@ -1229,13 +1267,7 @@ impl Verifier {
*store_repo, *store_repo,
store_readcap, store_readcap,
store_overlay_branch_readcap, store_overlay_branch_readcap,
Arc::clone( block_storage,
&self
.block_storage
.as_ref()
.ok_or(core::fmt::Error)
.expect("get_store_or_load cannot be called on Remote Verifier"),
),
); );
Arc::new(store) Arc::new(store)
}); });

@ -28,7 +28,7 @@ use ng_net::types::{APP_NG_ONE_URL, NG_ONE_URL};
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::types::*; use ng_repo::types::*;
use ng_repo::utils::{generate_keypair, sign, verify}; use ng_repo::utils::{generate_keypair, sign, verify};
use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage; use ng_storage_rocksdb::kcv_storage::RocksDbKCVStorage;
use ng_wallet::types::*; use ng_wallet::types::*;
#[derive(RustEmbed)] #[derive(RustEmbed)]
@ -36,7 +36,7 @@ use ng_wallet::types::*;
struct Static; struct Static;
struct Server { struct Server {
store: RocksdbKCVStorage, store: RocksDbKCVStorage,
} }
impl Server { impl Server {
@ -158,7 +158,7 @@ async fn main() {
let key: [u8; 32] = [0; 32]; let key: [u8; 32] = [0; 32];
log_debug!("data directory: {}", dir.to_str().unwrap()); log_debug!("data directory: {}", dir.to_str().unwrap());
fs::create_dir_all(dir.clone()).unwrap(); fs::create_dir_all(dir.clone()).unwrap();
let store = RocksdbKCVStorage::open(&dir, key); let store = RocksDbKCVStorage::open(&dir, key);
if store.is_err() { if store.is_err() {
return; return;
} }

Loading…
Cancel
Save