all events of wallet creation (3P site) sent to broker

pull/19/head
Niko PLP 7 months ago
parent 5fed085379
commit 32fffcb947
  1. 2
      Cargo.lock
  2. 4
      nextgraph/examples/open.rs
  3. 60
      nextgraph/src/local_broker.rs
  4. 13
      ng-app/src/routes/WalletCreate.svelte
  5. 2
      ng-broker/src/broker_storage/invitation.rs
  6. 24
      ng-broker/src/server_storage.rs
  7. 5
      ng-client-ws/src/remote_ws.rs
  8. 2
      ng-client-ws/src/remote_ws_wasm.rs
  9. 1
      ng-net/Cargo.toml
  10. 36
      ng-net/src/actor.rs
  11. 6
      ng-net/src/actors/add_invitation.rs
  12. 6
      ng-net/src/actors/add_user.rs
  13. 82
      ng-net/src/actors/client/event.rs
  14. 2
      ng-net/src/actors/client/mod.rs
  15. 21
      ng-net/src/actors/client/pin_repo.rs
  16. 16
      ng-net/src/actors/client/repo_pin_status.rs
  17. 19
      ng-net/src/actors/client/topic_sub.rs
  18. 7
      ng-net/src/actors/connecting.rs
  19. 5
      ng-net/src/actors/del_user.rs
  20. 6
      ng-net/src/actors/list_invitations.rs
  21. 6
      ng-net/src/actors/list_users.rs
  22. 3
      ng-net/src/actors/noise.rs
  23. 3
      ng-net/src/actors/probe.rs
  24. 9
      ng-net/src/actors/start.rs
  25. 17
      ng-net/src/broker.rs
  26. 343
      ng-net/src/broker_connection.rs
  27. 20
      ng-net/src/connection.rs
  28. 154
      ng-net/src/errors.rs
  29. 12
      ng-net/src/server_storage.rs
  30. 64
      ng-net/src/types.rs
  31. 1
      ng-repo/Cargo.toml
  32. 18
      ng-repo/src/branch.rs
  33. 15
      ng-repo/src/commit.rs
  34. 178
      ng-repo/src/errors.rs
  35. 15
      ng-repo/src/file.rs
  36. 11
      ng-repo/src/repo.rs
  37. 365
      ng-repo/src/store.rs
  38. 50
      ng-repo/src/types.rs
  39. 15
      ng-storage-rocksdb/src/kcv_storage.rs
  40. 116
      ng-verifier/src/site.rs
  41. 3
      ng-verifier/src/types.rs
  42. 39
      ng-verifier/src/user_storage/branch.rs
  43. 77
      ng-verifier/src/user_storage/repo.rs
  44. 139
      ng-verifier/src/verifier.rs
  45. 2
      ngcli/src/main.rs
  46. 587
      ngcli/src/old.rs

2
Cargo.lock generated

@ -3326,7 +3326,6 @@ dependencies = [
"ng-repo",
"noise-protocol",
"noise-rust-crypto",
"num_enum",
"once_cell",
"reqwest",
"serde",
@ -3355,6 +3354,7 @@ dependencies = [
"gloo-timers",
"hex",
"log",
"num_enum",
"once_cell",
"os_info",
"rand 0.7.3",

@ -37,7 +37,7 @@ async fn main() -> std::io::Result<()> {
}))
.await;
let wallet_name = "hQK0RBKua5TUm2jqeSGPOMMzqplllAkbUgEh5P6Otf4".to_string();
let wallet_name = "9ivXl3TpgcQlDKTmR9NOipjhPWxQw6Yg5jkWBTlJuXw".to_string();
// as we have previously saved the wallet,
// we can retrieve it, display the security phrase and image to the user, ask for the pazzle or mnemonic, and then open the wallet
@ -48,7 +48,7 @@ async fn main() -> std::io::Result<()> {
// now let's open the wallet, by providing the pazzle and PIN code
let opened_wallet = wallet_open_with_pazzle(
&wallet,
vec![134, 54, 112, 46, 94, 65, 20, 2, 99],
vec![110, 139, 115, 94, 9, 40, 74, 25, 52],
[2, 3, 2, 3],
)?;

@ -12,7 +12,6 @@ use async_std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use core::fmt;
use ng_net::actor::EActor;
use ng_net::connection::{ClientConfig, IConnect, NoiseFSM, StartConfig};
use ng_net::errors::ProtocolError;
use ng_net::types::{ClientInfo, ClientType, ProtocolMessage};
use ng_net::utils::{Receiver, Sender};
use ng_repo::block_storage::HashMapBlockStorage;
@ -30,7 +29,7 @@ use zeroize::{Zeroize, ZeroizeOnDrop};
use ng_net::broker::*;
use ng_repo::block_storage::BlockStorage;
use ng_repo::errors::NgError;
use ng_repo::errors::{NgError, ProtocolError};
use ng_repo::log::*;
use ng_repo::types::*;
use ng_repo::utils::derive_key;
@ -1424,6 +1423,7 @@ mod test {
};
use ng_net::types::BootstrapContentV0;
use ng_wallet::{display_mnemonic, emojis::display_pazzle};
use std::env::current_dir;
use std::fs::read_to_string;
use std::fs::{create_dir_all, File};
use std::io::BufReader;
@ -1537,6 +1537,62 @@ mod test {
file.write_all(&ser).expect("write of opened_wallet file");
}
#[async_std::test]
async fn gen_opened_wallet_file_for_test_with_pazzle_array() {
let wallet_file = read("tests/wallet.ngw").expect("read wallet file");
init_local_broker(Box::new(|| LocalBrokerConfig::InMemory)).await;
let wallet = wallet_read_file(wallet_file)
.await
.expect("wallet_read_file");
let pazzle = vec![114, 45, 86, 104, 1, 135, 17, 50, 65];
let opened_wallet =
wallet_open_with_pazzle(&wallet, pazzle, [2, 3, 2, 3]).expect("opening of wallet");
let mut file =
File::create("tests/opened_wallet.ngw").expect("open for write opened_wallet file");
let ser = serde_bare::to_vec(&opened_wallet).expect("serialization of opened wallet");
file.write_all(&ser).expect("write of opened_wallet file");
}
#[async_std::test]
async fn import_session_for_test_to_disk() {
let wallet_file = read("tests/wallet.ngw").expect("read wallet file");
let opened_wallet_file = read("tests/opened_wallet.ngw").expect("read opened_wallet file");
let opened_wallet: SensitiveWallet =
serde_bare::from_slice(&opened_wallet_file).expect("deserialization of opened_wallet");
let mut current_path = current_dir().expect("cur_dir");
current_path.push("..");
current_path.push(".ng");
current_path.push("example");
create_dir_all(current_path.clone()).expect("create_dir");
// initialize the local_broker with config to save to disk in a folder called `.ng/example` in the current directory
init_local_broker(Box::new(move || {
LocalBrokerConfig::BasePath(current_path.clone())
}))
.await;
let wallet = wallet_read_file(wallet_file)
.await
.expect("wallet_read_file");
let wallet_name = wallet.name();
let user_id = opened_wallet.personal_identity();
let _client = wallet_import(wallet, opened_wallet, false)
.await
.expect("wallet_import");
let _session = session_start(SessionConfig::new_in_memory(&user_id, &wallet_name))
.await
.expect("");
}
async fn import_session_for_test() -> (UserId, String) {
let wallet_file = read("tests/wallet.ngw").expect("read wallet file");
let opened_wallet_file = read("tests/opened_wallet.ngw").expect("read opened_wallet file");

@ -245,7 +245,7 @@
window.wallet_channel.postMessage(new_in_mem, location.href);
}
}
console.log(ready.pazzle);
console.log(display_pazzle(ready.pazzle));
download_name = "wallet-" + ready.wallet_name + ".ngw";
if (options.cloud) {
@ -1207,10 +1207,12 @@
would be the victim of a phishing attempt.
</Alert>
</p>
<p class="text-left mt-5">
<p class="max-w-xl md:mx-auto lg:max-w-2xl text-left mt-5 text-sm">
Here are the rules for the security phrase and image :
</p>
<ul class="text-left list-disc list-inside">
<ul
class="max-w-xl md:mx-auto lg:max-w-2xl text-left mt-5 text-sm list-disc list-inside"
>
<li>The phrase should be at least 10 characters long</li>
<li>
It should be something you will remember, but not something too
@ -1246,6 +1248,7 @@
with you, will be able to see this image and phrase.
</li>
</ul>
<input
bind:this={phrase}
class="mt-10 mr-0"
@ -1529,8 +1532,8 @@
/>
Now click on "Continue to Login."<br /><br />It is important that
you login with this wallet at least once from this device<br />
(while connected to the internet), so that your personal site is
created on your broker.<br /><br />
(while connected to the internet), so that your personal site is created
on your broker.<br /><br />
<a href="/wallet/login" use:link>
<button
tabindex="-1"

@ -14,8 +14,8 @@ use std::hash::Hash;
use std::hash::Hasher;
use std::time::SystemTime;
use ng_net::errors::ProtocolError;
use ng_net::types::*;
use ng_repo::errors::ProtocolError;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::types::SymKey;

@ -19,10 +19,9 @@ use crate::broker_storage::account::Account;
use crate::broker_storage::invitation::Invitation;
use crate::broker_storage::wallet::Wallet;
use crate::types::*;
use ng_net::errors::{ProtocolError, ServerError};
use ng_net::server_storage::*;
use ng_net::types::*;
use ng_repo::errors::StorageError;
use ng_repo::errors::{ProtocolError, ServerError, StorageError};
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::log::*;
use ng_repo::types::*;
@ -203,17 +202,18 @@ impl ServerStorage for RocksdbServerStorage {
&self,
overlay: &OverlayId,
repo: &RepoHash,
) -> Result<RepoPinStatus, ProtocolError> {
) -> Result<RepoPinStatus, ServerError> {
Err(ServerError::False)
//TODO: implement correctly !
Ok(RepoPinStatus::V0(RepoPinStatusV0 {
hash: repo.clone(),
// Ok(RepoPinStatus::V0(RepoPinStatusV0 {
// hash: repo.clone(),
// only possible for RW overlays
expose_outer: false,
// // only possible for RW overlays
// expose_outer: false,
// list of topics that are subscribed to
topics: vec![],
}))
// // list of topics that are subscribed to
// topics: vec![],
// }))
}
fn pin_repo(
@ -222,7 +222,7 @@ impl ServerStorage for RocksdbServerStorage {
repo: &RepoHash,
ro_topics: &Vec<TopicId>,
rw_topics: &Vec<PublisherAdvert>,
) -> Result<RepoOpened, ProtocolError> {
) -> Result<RepoOpened, ServerError> {
//TODO: implement correctly !
let mut opened = Vec::with_capacity(ro_topics.len() + rw_topics.len());
for topic in ro_topics {
@ -240,7 +240,7 @@ impl ServerStorage for RocksdbServerStorage {
repo: &RepoHash,
topic: &TopicId,
publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ProtocolError> {
) -> Result<TopicSubRes, ServerError> {
//TODO: implement correctly !
Ok(TopicSubRes::V0(TopicSubResV0 {
topic: topic.clone(),

@ -24,10 +24,10 @@ use futures::{future, pin_mut, select, stream, StreamExt};
use futures::{FutureExt, SinkExt};
use async_std::task;
use ng_net::errors::*;
use ng_net::types::*;
use ng_net::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use ng_net::{connection::*, WS_PORT};
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::*;
use ng_repo::utils::{generate_keypair, now_timestamp};
@ -302,10 +302,9 @@ mod test {
use crate::remote_ws::*;
use async_std::task;
use ng_net::broker::*;
use ng_net::errors::NetError;
use ng_net::types::IP;
use ng_net::utils::{spawn_and_log_error, ResultSend};
use ng_repo::errors::NgError;
use ng_repo::errors::{NetError, NgError};
use ng_repo::log::*;
use ng_repo::utils::generate_keypair;
use std::net::IpAddr;

@ -15,10 +15,10 @@ use either::Either;
use futures::FutureExt;
use futures::{future, pin_mut, select, stream, SinkExt, StreamExt};
use ng_net::connection::*;
use ng_net::errors::*;
use ng_net::types::*;
use ng_net::utils::*;
use ng_net::WS_PORT;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::*;
use ng_repo::utils::{generate_keypair, now_timestamp};

@ -21,7 +21,6 @@ ng-repo = { path = "../ng-repo", version = "0.1.0" }
serde = { version = "1.0", features = ["derive"] }
serde_bare = "0.5.0"
serde_bytes = "0.11.7"
num_enum = "0.5.7"
async-broadcast = "0.4.1"
futures = "0.3.24"
async-trait = "0.1.64"

@ -18,7 +18,8 @@ use std::any::TypeId;
use std::sync::Arc;
use crate::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use crate::{connection::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{connection::*, types::ProtocolMessage};
use ng_repo::errors::{NgError, ProtocolError, ServerError};
use std::marker::PhantomData;
impl TryFrom<ProtocolMessage> for () {
@ -35,6 +36,8 @@ pub trait EActor: Send + Sync + std::fmt::Debug {
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError>;
fn set_id(&mut self, id: i64) {}
}
#[derive(Debug)]
@ -115,13 +118,13 @@ impl<
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<SoS<B>, ProtocolError> {
) -> Result<SoS<B>, NgError> {
fsm.lock().await.send(msg).await?;
let mut receiver = self.receiver.take().unwrap();
match receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => {
if let ProtocolMessage::ClientMessage(ref bm) = msg {
if bm.result() == Into::<u16>::into(ProtocolError::PartialContent)
if bm.result() == Into::<u16>::into(ServerError::PartialContent)
&& TypeId::of::<B>() != TypeId::of::<()>()
{
let (mut b_sender, b_receiver) = mpsc::unbounded::<B>();
@ -142,7 +145,7 @@ impl<
{
if let ProtocolMessage::ClientMessage(ref bm) = msg {
if bm.result()
== Into::<u16>::into(ProtocolError::EndOfStream)
== Into::<u16>::into(ServerError::EndOfStream)
{
break;
}
@ -174,23 +177,34 @@ impl<
}
}
fsm.lock().await.remove_actor(self.id).await;
let response: B = msg.try_into()?;
let server_error: Result<ServerError, NgError> = (&msg).try_into();
let response: B = match msg.try_into() {
Ok(b) => b,
Err(ProtocolError::ServerError) => {
return Err(NgError::ServerError(server_error?));
}
Err(e) => return Err(NgError::ProtocolError(e)),
};
Ok(SoS::<B>::Single(response))
}
Some(ConnectionCommand::ProtocolError(e)) => Err(e),
Some(ConnectionCommand::Error(e)) => Err(e.into()),
Some(ConnectionCommand::Close) => Err(ProtocolError::Closing),
_ => Err(ProtocolError::ActorError),
Some(ConnectionCommand::ProtocolError(e)) => Err(e.into()),
Some(ConnectionCommand::Error(e)) => Err(ProtocolError::from(e).into()),
Some(ConnectionCommand::Close) => Err(ProtocolError::Closing.into()),
_ => Err(ProtocolError::ActorError.into()),
}
}
pub fn new_responder() -> Box<Self> {
Box::new(Self::new(0, false))
pub fn new_responder(id: i64) -> Box<Self> {
Box::new(Self::new(id, false))
}
pub fn get_receiver_tx(&self) -> Sender<ConnectionCommand> {
self.receiver_tx.clone()
}
pub fn id(&self) -> i64 {
self.id
}
}
#[cfg(test)]

@ -11,9 +11,9 @@
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
@ -58,7 +58,7 @@ impl AddInvitation {
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<AddInvitation, AdminResponse>::new_responder()
Actor::<AddInvitation, AdminResponse>::new_responder(0)
}
}

@ -11,9 +11,9 @@
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
@ -48,7 +48,7 @@ impl AddUser {
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<AddUser, AdminResponse>::new_responder()
Actor::<AddUser, AdminResponse>::new_responder(0)
}
}

@ -0,0 +1,82 @@
/*
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::repo::{BranchInfo, Repo};
use ng_repo::store::Store;
use ng_repo::types::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
impl PublishEvent {
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
Actor::<PublishEvent, ()>::new_responder(id)
}
pub fn new(event: Event, overlay: OverlayId) -> PublishEvent {
PublishEvent(event, Some(overlay))
}
pub fn set_overlay(&mut self, overlay: OverlayId) {
self.1 = Some(overlay);
}
// pub fn overlay(&self) -> &OverlayId {
// self.1.as_ref().unwrap()
// }
}
impl TryFrom<ProtocolMessage> for PublishEvent {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
let req: ClientRequestContentV0 = msg.try_into()?;
if let ClientRequestContentV0::PublishEvent(a) = req {
Ok(a)
} else {
log_debug!("INVALID {:?}", req);
Err(ProtocolError::InvalidValue)
}
}
}
impl From<PublishEvent> for ProtocolMessage {
fn from(msg: PublishEvent) -> ProtocolMessage {
let overlay = msg.1.unwrap();
ProtocolMessage::from_client_request_v0(ClientRequestContentV0::PublishEvent(msg), overlay)
}
}
impl Actor<'_, PublishEvent, ()> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, PublishEvent, ()> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
let req = PublishEvent::try_from(msg)?;
//TODO implement all the server side logic
let res: Result<(), ServerError> = Ok(());
fsm.lock()
.await
.send_in_reply_to(res.into(), self.id())
.await?;
Ok(())
}
}

@ -3,3 +3,5 @@ pub mod repo_pin_status;
pub mod pin_repo;
pub mod topic_sub;
pub mod event;

@ -11,9 +11,9 @@
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::repo::Repo;
use ng_repo::types::*;
@ -21,14 +21,12 @@ use serde::{Deserialize, Serialize};
use std::sync::Arc;
impl PinRepo {
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<PinRepo, RepoOpened>::new_responder()
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
Actor::<PinRepo, RepoOpened>::new_responder(id)
}
pub fn from_repo(repo: &Repo, broker_id: &DirectPeerId) -> PinRepo {
let overlay = OverlayAccess::ReadWrite((
OverlayId::inner_from_store(&repo.store),
OverlayId::outer(repo.store.id()),
));
let overlay =
OverlayAccess::ReadWrite((repo.store.inner_overlay(), repo.store.outer_overlay()));
let mut rw_topics = Vec::with_capacity(repo.branches.len());
let mut ro_topics = vec![];
for (_, branch) in repo.branches.iter() {
@ -116,9 +114,12 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> {
req.hash(),
req.ro_topics(),
req.rw_topics(),
)?;
);
fsm.lock().await.send(res.into()).await?;
fsm.lock()
.await
.send_in_reply_to(res.into(), self.id())
.await?;
Ok(())
}
}

@ -11,17 +11,17 @@
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
impl RepoPinStatusReq {
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<RepoPinStatusReq, RepoPinStatus>::new_responder()
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
Actor::<RepoPinStatusReq, RepoPinStatus>::new_responder(id)
}
}
@ -80,9 +80,11 @@ impl EActor for Actor<'_, RepoPinStatusReq, RepoPinStatus> {
let broker = BROKER.read().await;
let res = broker
.get_server_storage()?
.get_repo_pin_status(req.overlay(), req.hash())?;
fsm.lock().await.send(res.into()).await?;
.get_repo_pin_status(req.overlay(), req.hash());
fsm.lock()
.await
.send_in_reply_to(res.into(), self.id())
.await?;
Ok(())
}
}

@ -11,9 +11,9 @@
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::repo::{BranchInfo, Repo};
use ng_repo::types::*;
@ -21,14 +21,14 @@ use serde::{Deserialize, Serialize};
use std::sync::Arc;
impl TopicSub {
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<TopicSub, TopicSubRes>::new_responder()
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> {
Actor::<TopicSub, TopicSubRes>::new_responder(id)
}
/// only set broker_id if you want to be a publisher
pub fn new(repo: &Repo, branch: &BranchInfo, broker_id: Option<&DirectPeerId>) -> TopicSub {
let (overlay, publisher) = if broker_id.is_some() && branch.topic_priv_key.is_some() {
(
OverlayId::inner_from_store(&repo.store),
repo.store.inner_overlay(),
Some(PublisherAdvert::new(
branch.topic,
branch.topic_priv_key.to_owned().unwrap(),
@ -36,7 +36,7 @@ impl TopicSub {
)),
)
} else {
(OverlayId::outer(repo.store.id()), None)
(repo.store.outer_overlay(), None)
};
TopicSub::V0(TopicSubV0 {
@ -105,9 +105,12 @@ impl EActor for Actor<'_, TopicSub, TopicSubRes> {
req.hash(),
req.topic(),
req.publisher(),
)?;
);
fsm.lock().await.send(res.into()).await?;
fsm.lock()
.await
.send_in_reply_to(res.into(), self.id())
.await?;
Ok(())
}
}

@ -11,15 +11,16 @@
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
pub type Connecting = ();
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Connecting();
impl From<Connecting> for ProtocolMessage {
fn from(msg: Connecting) -> ProtocolMessage {

@ -11,8 +11,9 @@
use crate::broker::BROKER;
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
@ -39,7 +40,7 @@ impl DelUser {
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<DelUser, AdminResponse>::new_responder()
Actor::<DelUser, AdminResponse>::new_responder(0)
}
}

@ -11,9 +11,9 @@
use crate::broker::BROKER;
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
@ -55,7 +55,7 @@ impl ListInvitations {
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<ListInvitations, AdminResponse>::new_responder()
Actor::<ListInvitations, AdminResponse>::new_responder(0)
}
}

@ -11,9 +11,9 @@
use crate::broker::BROKER;
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
@ -41,7 +41,7 @@ impl ListUsers {
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<ListUsers, AdminResponse>::new_responder()
Actor::<ListUsers, AdminResponse>::new_responder(0)
}
}

@ -11,8 +11,9 @@
use std::sync::Arc;
use crate::{actor::*, connection::NoiseFSM, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, connection::NoiseFSM, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use serde::{Deserialize, Serialize};
use std::any::{Any, TypeId};

@ -11,8 +11,9 @@
use crate::connection::NoiseFSM;
use crate::types::{ProbeResponse, MAGIC_NG_REQUEST};
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use serde::{Deserialize, Serialize};
use std::any::{Any, TypeId};
use std::sync::Arc;

@ -15,8 +15,9 @@ use crate::types::{
AdminRequest, CoreBrokerConnect, CoreBrokerConnectResponse, CoreBrokerConnectResponseV0,
CoreMessage, CoreMessageV0, CoreResponse, CoreResponseContentV0, CoreResponseV0, ExtResponse,
};
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use crate::{actor::*, types::ProtocolMessage};
use async_std::sync::Mutex;
use ng_repo::errors::*;
use ng_repo::log::*;
use serde::{Deserialize, Serialize};
use std::any::{Any, TypeId};
@ -71,7 +72,7 @@ pub struct CoreHello {
impl CoreHello {
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<CoreBrokerConnect, CoreBrokerConnectResponse>::new_responder()
Actor::<CoreBrokerConnect, CoreBrokerConnectResponse>::new_responder(0)
}
}
@ -136,7 +137,7 @@ pub struct ExtHello {
impl ExtHello {
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<ExtHello, ExtResponse>::new_responder()
Actor::<ExtHello, ExtResponse>::new_responder(0)
}
}
@ -162,7 +163,7 @@ impl ClientHello {
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<ClientHello, ServerHello>::new_responder()
Actor::<ClientHello, ServerHello>::new_responder(0)
}
}

@ -14,7 +14,6 @@
use crate::actor::EActor;
use crate::actor::SoS;
use crate::connection::*;
use crate::errors::*;
use crate::server_storage::ServerStorage;
use crate::types::*;
use crate::utils::spawn_and_log_error;
@ -25,8 +24,7 @@ use either::Either;
use futures::channel::mpsc;
use futures::SinkExt;
use ng_repo::block_storage::HashMapBlockStorage;
use ng_repo::errors::NgError;
use ng_repo::errors::ObjectParseError;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::object::Object;
use ng_repo::types::*;
@ -68,13 +66,6 @@ pub struct ServerConfig {
pub bootstrap: BootstrapContent,
}
/*pub trait EActor: Send + Sync + std::fmt::Debug {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError>;
}*/
#[async_trait::async_trait]
pub trait ILocalBroker: Send + Sync + EActor {
async fn deliver(&mut self, event: Event);
@ -924,15 +915,15 @@ impl<'a> Broker<'a> {
user: &UserId,
remote_peer_id: &DirectPeerId,
msg: A,
) -> Result<SoS<B>, ProtocolError> {
) -> Result<SoS<B>, NgError> {
let bpi = self
.peers
.get(&(Some(*user), remote_peer_id.to_dh_slice()))
.ok_or(ProtocolError::InvalidValue)?;
.ok_or(NgError::ConnectionNotFound)?;
if let PeerConnection::Client(cnx) = &bpi.connected {
cnx.request(msg).await
} else {
Err(ProtocolError::BrokerError)
Err(NgError::BrokerError)
}
}

@ -1,343 +0,0 @@
/*
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
//! Connection to a Broker, can be local or remote.
//! If remote, it will use a Stream and Sink of framed messages
//! This is the trait
//!
use futures::channel::mpsc;
use futures::{
ready, select,
stream::Stream,
task::{Context, Poll},
Future, FutureExt,
};
use std::pin::Pin;
use std::{collections::HashSet, fmt::Debug};
use crate::errors::*;
use crate::types::*;
use async_broadcast::{broadcast, Receiver};
use futures::{pin_mut, stream, Sink, SinkExt, StreamExt};
use ng_repo::log::*;
use ng_repo::object::*;
use ng_repo::store::*;
use ng_repo::types::*;
use ng_repo::utils::*;
#[async_trait::async_trait]
pub trait BrokerConnection {
type OC: BrokerConnection;
type BlockStream: Stream<Item = Block>;
async fn close(&mut self);
async fn add_user(
&mut self,
user_id: PubKey,
admin_user_pk: PrivKey,
) -> Result<(), ProtocolError>;
async fn del_user(&mut self, user_id: PubKey, admin_user_pk: PrivKey);
async fn add_client(&mut self, client_id: ClientId, user_pk: PrivKey);
async fn del_client(&mut self, client_id: ClientId, user_pk: PrivKey);
async fn overlay_connect(
&mut self,
repo: &RepoLink,
public: bool,
) -> Result<OverlayConnectionClient<Self::OC>, ProtocolError>;
// TODO: remove those 4 functions from trait. they are used internally only. should not be exposed to end-user
async fn process_overlay_request(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<(), ProtocolError>;
async fn process_overlay_request_stream_response(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<Pin<Box<Self::BlockStream>>, ProtocolError>;
async fn process_overlay_request_objectid_response(
&mut self,
overlay: OverlayId,
request: BrokerOverlayRequestContentV0,
) -> Result<ObjectId, ProtocolError>;
async fn process_overlay_connect(
&mut self,
repo_link: &RepoLink,
public: bool,
) -> Result<OverlayId, ProtocolError> {
let overlay: OverlayId = match public {
true => Digest::Blake3Digest32(*blake3::hash(repo_link.id().slice()).as_bytes()),
false => {
let key: [u8; blake3::OUT_LEN] = blake3::derive_key(
"NextGraph OverlayId BLAKE3 key",
repo_link.secret().slice(),
);
let keyed_hash = blake3::keyed_hash(&key, repo_link.id().slice());
Digest::Blake3Digest32(*keyed_hash.as_bytes())
}
};
let res = self
.process_overlay_request(
overlay,
BrokerOverlayRequestContentV0::OverlayConnect(OverlayConnect::V0()),
)
.await;
match res {
Err(e) => {
if e == ProtocolError::OverlayNotJoined {
log_debug!("OverlayNotJoined");
let res2 = self
.process_overlay_request(
overlay,
BrokerOverlayRequestContentV0::OverlayJoin(OverlayJoin::V0(
OverlayJoinV0 {
secret: repo_link.secret(),
peers: repo_link.peers(),
repo_pubkey: Some(repo_link.id()), //TODO if we know we are connecting to a core node, we can pass None here
},
)),
)
.await?;
} else {
return Err(e);
}
}
Ok(()) => {}
}
log_debug!("OverlayConnectionClient ready");
Ok(overlay)
}
}
pub struct OverlayConnectionClient<'a, T>
where
T: BrokerConnection,
{
broker: &'a mut T,
overlay: OverlayId,
repo_link: RepoLink,
}
impl<'a, T> OverlayConnectionClient<'a, T>
where
T: BrokerConnection,
{
pub fn create(
broker: &'a mut T,
overlay: OverlayId,
repo_link: RepoLink,
) -> OverlayConnectionClient<'a, T> {
OverlayConnectionClient {
broker,
repo_link,
overlay,
}
}
pub fn overlay(repo_link: &RepoLink, public: bool) -> OverlayId {
let overlay: OverlayId = match public {
true => Digest::Blake3Digest32(*blake3::hash(repo_link.id().slice()).as_bytes()),
false => {
let key: [u8; blake3::OUT_LEN] = blake3::derive_key(
"NextGraph OverlayId BLAKE3 key",
repo_link.secret().slice(),
);
let keyed_hash = blake3::keyed_hash(&key, repo_link.id().slice());
Digest::Blake3Digest32(*keyed_hash.as_bytes())
}
};
overlay
}
pub async fn sync_branch(
&mut self,
heads: Vec<ObjectId>,
known_heads: Vec<ObjectId>,
known_commits: BloomFilter,
) -> Result<Pin<Box<T::BlockStream>>, ProtocolError> {
self.broker
.process_overlay_request_stream_response(
self.overlay,
BrokerOverlayRequestContentV0::BranchSyncReq(BranchSyncReq::V0(BranchSyncReqV0 {
heads,
known_heads,
known_commits,
})),
)
.await
}
pub fn leave(&self) {}
pub fn topic_connect(&self, id: TopicId) -> TopicSubscription<T> {
let (s, mut r1) = broadcast(128); // FIXME this should be done only once, in the Broker
TopicSubscription {
id,
overlay_cnx: self,
event_stream: r1.clone(),
}
}
pub async fn delete_object(&mut self, id: ObjectId) -> Result<(), ProtocolError> {
self.broker
.process_overlay_request(
self.overlay,
BrokerOverlayRequestContentV0::ObjectDel(ObjectDel::V0(ObjectDelV0 { id })),
)
.await
}
pub async fn pin_object(&mut self, id: ObjectId) -> Result<(), ProtocolError> {
self.broker
.process_overlay_request(
self.overlay,
BrokerOverlayRequestContentV0::ObjectPin(ObjectPin::V0(ObjectPinV0 { id })),
)
.await
}
pub async fn unpin_object(&mut self, id: ObjectId) -> Result<(), ProtocolError> {
self.broker
.process_overlay_request(
self.overlay,
BrokerOverlayRequestContentV0::ObjectUnpin(ObjectUnpin::V0(ObjectUnpinV0 { id })),
)
.await
}
pub async fn copy_object(
&mut self,
id: ObjectId,
expiry: Option<Timestamp>,
) -> Result<ObjectId, ProtocolError> {
self.broker
.process_overlay_request_objectid_response(
self.overlay,
BrokerOverlayRequestContentV0::ObjectCopy(ObjectCopy::V0(ObjectCopyV0 {
id,
expiry,
})),
)
.await
}
pub async fn get_block(
&mut self,
id: BlockId,
include_children: bool,
topic: Option<PubKey>,
) -> Result<Pin<Box<T::BlockStream>>, ProtocolError> {
self.broker
.process_overlay_request_stream_response(
self.overlay,
BrokerOverlayRequestContentV0::BlockGet(BlockGet::V0(BlockGetV0 {
id,
include_children,
topic,
})),
)
.await
}
pub async fn get_object(
&mut self,
id: ObjectId,
topic: Option<PubKey>,
) -> Result<Object, ProtocolError> {
let mut blockstream = self.get_block(id, true, topic).await?;
let mut store = HashMapBlockStorage::new();
while let Some(block) = blockstream.next().await {
store.put(&block).unwrap();
}
Object::load(id, None, &store).map_err(|e| match e {
ObjectParseError::MissingBlocks(_missing) => ProtocolError::MissingBlocks,
_ => ProtocolError::ObjectParseError,
})
}
pub async fn put_block(&mut self, block: &Block) -> Result<BlockId, ProtocolError> {
self.broker
.process_overlay_request(
self.overlay,
BrokerOverlayRequestContentV0::BlockPut(BlockPut::V0(block.clone())),
)
.await?;
Ok(block.id())
}
// TODO maybe implement a put_block_with_children ? that would behave like put_object, but taking in a parent Blockk instead of a content
pub async fn put_object(
&mut self,
content: ObjectContent,
deps: Vec<ObjectId>,
expiry: Option<Timestamp>,
max_object_size: usize,
repo_pubkey: PubKey,
repo_secret: SymKey,
) -> Result<ObjectId, ProtocolError> {
let obj = Object::new(
content,
deps,
expiry,
max_object_size,
repo_pubkey,
repo_secret,
);
log_debug!("object has {} blocks", obj.blocks().len());
let mut deduplicated: HashSet<ObjectId> = HashSet::new();
for block in obj.blocks() {
let id = block.id();
if deduplicated.get(&id).is_none() {
let _ = self.put_block(block).await?;
deduplicated.insert(id);
}
}
Ok(obj.id())
}
}
pub struct TopicSubscription<'a, T>
where
T: BrokerConnection,
{
id: TopicId,
overlay_cnx: &'a OverlayConnectionClient<'a, T>,
event_stream: Receiver<Event>,
}
impl<'a, T> TopicSubscription<'a, T>
where
T: BrokerConnection,
{
pub fn unsubscribe(&self) {}
pub fn disconnect(&self) {}
pub fn get_branch_heads(&self) {}
pub fn get_event_stream(&self) -> &Receiver<Event> {
&self.event_stream
}
}

@ -21,8 +21,6 @@ use std::sync::Arc;
use crate::actor::{Actor, SoS};
use crate::actors::*;
use crate::broker::BROKER;
use crate::errors::NetError;
use crate::errors::ProtocolError;
use crate::types::*;
use crate::utils::*;
@ -30,6 +28,7 @@ use async_std::stream::StreamExt;
use async_std::sync::Mutex;
use either::Either;
use futures::{channel::mpsc, select, FutureExt, SinkExt};
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::{DirectPeerId, PrivKey, PubKey, UserId, X25519PrivKey};
use ng_repo::utils::{sign, verify};
@ -290,6 +289,17 @@ impl NoiseFSM {
}
pub async fn send(&mut self, msg: ProtocolMessage) -> Result<(), ProtocolError> {
self.send_in_reply_to(msg, 0).await
}
pub async fn send_in_reply_to(
&mut self,
mut msg: ProtocolMessage,
in_reply_to: i64,
) -> Result<(), ProtocolError> {
if in_reply_to != 0 {
msg.set_id(in_reply_to);
}
log_debug!("SENDING: {:?}", msg);
if self.noise_cipher_state_enc.is_some() {
let cipher = self.encrypt(msg)?;
@ -825,7 +835,7 @@ impl NoiseFSM {
if msg.type_id() != TypeId::of::<ClientMessage>() {
return Err(ProtocolError::AccessDenied);
}
let id = msg.id();
let id: i64 = msg.id();
if self.dir.is_server() && id > 0 || !self.dir.is_server() && id < 0 {
return Ok(StepReply::Responder(msg));
} else if id != 0 {
@ -1038,9 +1048,9 @@ impl ConnectionBase {
>(
&self,
msg: A,
) -> Result<SoS<B>, ProtocolError> {
) -> Result<SoS<B>, NgError> {
if self.fsm.is_none() {
return Err(ProtocolError::FsmNotReady);
return Err(NgError::ProtocolError(ProtocolError::FsmNotReady));
}
let mut id = self.next_request_id.next_id();

@ -9,162 +9,10 @@
use core::fmt;
use ng_repo::errors::{ObjectParseError, StorageError};
use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive;
use std::convert::From;
use std::error::Error;
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum ServerError {
SequenceMismatch,
FileError,
}
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum NetError {
DirectionAlreadySet = 1,
WsError,
IoError,
ConnectionError,
SerializationError,
ProtocolError,
AccessDenied,
InternalError,
PeerAlreadyConnected,
Closing,
} //MAX 50 NetErrors
impl Error for NetError {}
impl fmt::Display for NetError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum ProtocolError {
NoError = 0,
PartialContent,
EndOfStream,
IoError,
WsError,
ActorError,
InvalidState,
SignatureError,
InvalidSignature,
SerializationError,
AccessDenied,
InvitationRequired,
OverlayNotJoined,
OverlayNotFound,
BrokerError,
NotFound,
MissingBlocks,
ObjectParseError,
InvalidValue,
AlreadyExists,
RepoIdRequired,
ConnectionError,
Timeout,
Expired,
PeerAlreadyConnected,
OtherError,
NetError,
StorageError,
Closing,
FsmNotReady,
MustBeEncrypted,
NoiseHandshakeFailed,
DecryptionError,
EncryptionError,
WhereIsTheMagic,
RepoAlreadyOpened,
False,
InvalidNonce,
} //MAX 949 ProtocolErrors
impl From<NetError> for ProtocolError {
fn from(e: NetError) -> Self {
match e {
NetError::IoError => ProtocolError::IoError,
NetError::WsError => ProtocolError::WsError,
NetError::ConnectionError => ProtocolError::ConnectionError,
NetError::SerializationError => ProtocolError::SerializationError,
NetError::ProtocolError => ProtocolError::OtherError,
NetError::AccessDenied => ProtocolError::AccessDenied,
NetError::PeerAlreadyConnected => ProtocolError::PeerAlreadyConnected,
NetError::Closing => ProtocolError::Closing,
_ => ProtocolError::NetError,
}
}
}
impl From<StorageError> for ProtocolError {
fn from(e: StorageError) -> Self {
match e {
StorageError::NotFound => ProtocolError::NotFound,
StorageError::InvalidValue => ProtocolError::InvalidValue,
StorageError::BackendError => ProtocolError::StorageError,
StorageError::SerializationError => ProtocolError::SerializationError,
StorageError::AlreadyExists => ProtocolError::AlreadyExists,
_ => ProtocolError::StorageError,
}
}
}
impl ProtocolError {
pub fn is_stream(&self) -> bool {
*self == ProtocolError::PartialContent || *self == ProtocolError::EndOfStream
}
pub fn is_err(&self) -> bool {
*self != ProtocolError::NoError
}
}
impl Error for ProtocolError {}
impl fmt::Display for ProtocolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<ng_repo::errors::NgError> for ProtocolError {
fn from(e: ng_repo::errors::NgError) -> Self {
match e {
ng_repo::errors::NgError::InvalidSignature => ProtocolError::InvalidSignature,
ng_repo::errors::NgError::SerializationError => ProtocolError::SerializationError,
_ => ProtocolError::OtherError,
}
}
}
impl From<ObjectParseError> for ProtocolError {
fn from(_e: ObjectParseError) -> Self {
ProtocolError::ObjectParseError
}
}
impl From<serde_bare::error::Error> for ProtocolError {
fn from(_e: serde_bare::error::Error) -> Self {
ProtocolError::SerializationError
}
}
impl From<serde_bare::error::Error> for NetError {
fn from(_e: serde_bare::error::Error) -> Self {
NetError::SerializationError
}
}
// impl From<BrokerMessage> for Result<(), ProtocolError> {
// fn from(msg: BrokerMessage) -> Self {
// if !msg.is_response() {

@ -11,10 +11,8 @@
//! Trait for ServerStorage
use crate::{
errors::{ProtocolError, ServerError},
types::*,
};
use crate::types::*;
use ng_repo::errors::*;
use ng_repo::types::*;
pub trait ServerStorage: Send + Sync {
@ -43,7 +41,7 @@ pub trait ServerStorage: Send + Sync {
&self,
overlay: &OverlayId,
repo: &RepoHash,
) -> Result<RepoPinStatus, ProtocolError>;
) -> Result<RepoPinStatus, ServerError>;
fn pin_repo(
&self,
@ -51,7 +49,7 @@ pub trait ServerStorage: Send + Sync {
repo: &RepoHash,
ro_topics: &Vec<TopicId>,
rw_topics: &Vec<PublisherAdvert>,
) -> Result<RepoOpened, ProtocolError>;
) -> Result<RepoOpened, ServerError>;
fn topic_sub(
&self,
@ -59,5 +57,5 @@ pub trait ServerStorage: Send + Sync {
repo: &RepoHash,
topic: &TopicId,
publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ProtocolError>;
) -> Result<TopicSubRes, ServerError>;
}

@ -16,9 +16,9 @@ use crate::utils::{
is_public_ipv4, is_public_ipv6,
};
use crate::WS_PORT_ALTERNATE;
use crate::{actor::EActor, actors::*, errors::ProtocolError};
use crate::{actor::EActor, actors::*};
use core::fmt;
use ng_repo::errors::NgError;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::*;
use serde::{Deserialize, Serialize};
@ -2806,8 +2806,8 @@ impl RepoPinStatus {
match self {
Self::V0(v0) => {
for sub in &v0.topics {
if sub.is_publisher() {
return true;
if sub.topic_id() == topic {
return sub.is_publisher();
}
}
false
@ -3034,6 +3034,10 @@ impl ObjectDel {
}
}
/// Request to delete an object
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PublishEvent(pub Event, #[serde(skip)] pub Option<OverlayId>);
/// Content of `ClientRequestV0`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ClientRequestContentV0 {
@ -3057,7 +3061,7 @@ pub enum ClientRequestContentV0 {
// For InnerOverlay's only :
BlocksPut(BlocksPut),
PublishEvent(Event),
PublishEvent(PublishEvent),
}
impl ClientRequestContentV0 {
@ -3066,6 +3070,7 @@ impl ClientRequestContentV0 {
ClientRequestContentV0::RepoPinStatusReq(a) => a.set_overlay(overlay),
ClientRequestContentV0::TopicSub(a) => a.set_overlay(overlay),
ClientRequestContentV0::PinRepo(a) => {}
ClientRequestContentV0::PublishEvent(a) => a.set_overlay(overlay),
_ => unimplemented!(),
}
}
@ -3108,7 +3113,10 @@ impl ClientRequest {
pub fn get_actor(&self) -> Box<dyn EActor> {
match self {
Self::V0(ClientRequestV0 { content, .. }) => match content {
ClientRequestContentV0::RepoPinStatusReq(r) => r.get_actor(),
ClientRequestContentV0::RepoPinStatusReq(r) => r.get_actor(self.id()),
ClientRequestContentV0::PinRepo(r) => r.get_actor(self.id()),
ClientRequestContentV0::TopicSub(r) => r.get_actor(self.id()),
ClientRequestContentV0::PublishEvent(r) => r.get_actor(self.id()),
_ => unimplemented!(),
},
}
@ -3119,10 +3127,10 @@ impl TryFrom<ProtocolMessage> for ClientRequestContentV0 {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::ClientMessage(ClientMessage::V0(ClientMessageV0 {
overlay: overlay,
overlay,
content:
ClientMessageContentV0::ClientRequest(ClientRequest::V0(ClientRequestV0 {
content: mut content,
mut content,
..
})),
..
@ -3249,8 +3257,8 @@ pub enum ClientResponse {
V0(ClientResponseV0),
}
impl From<ProtocolError> for ClientResponse {
fn from(err: ProtocolError) -> ClientResponse {
impl From<ServerError> for ClientResponse {
fn from(err: ServerError) -> ClientResponse {
ClientResponse::V0(ClientResponseV0 {
id: 0,
result: err.into(),
@ -3259,6 +3267,25 @@ impl From<ProtocolError> for ClientResponse {
}
}
impl<A> From<Result<A, ServerError>> for ProtocolMessage
where
A: Into<ProtocolMessage> + std::fmt::Debug,
{
fn from(res: Result<A, ServerError>) -> ProtocolMessage {
match res {
Ok(a) => a.into(),
Err(e) => ProtocolMessage::from_client_response_err(e),
}
}
}
impl From<()> for ProtocolMessage {
fn from(msg: ()) -> ProtocolMessage {
let cm: ClientResponse = ServerError::Ok.into();
cm.into()
}
}
impl ClientResponse {
pub fn id(&self) -> i64 {
match self {
@ -3303,7 +3330,7 @@ impl TryFrom<ProtocolMessage> for ClientResponseContentV0 {
if res == 0 {
Ok(content)
} else {
Err(ProtocolError::try_from(res).unwrap())
Err(ProtocolError::ServerError)
}
} else {
log_debug!("INVALID {:?}", msg);
@ -3657,6 +3684,19 @@ pub enum ProtocolMessage {
CoreMessage(CoreMessage),
}
impl TryFrom<&ProtocolMessage> for ServerError {
type Error = NgError;
fn try_from(msg: &ProtocolMessage) -> Result<Self, NgError> {
if let ProtocolMessage::ClientMessage(ref bm) = msg {
let res = bm.result();
if res != 0 {
return Ok(ServerError::try_from(res).unwrap());
}
}
Err(NgError::NotAServerError)
}
}
impl ProtocolMessage {
pub fn id(&self) -> i64 {
match self {
@ -3711,7 +3751,7 @@ impl ProtocolMessage {
}
}
pub fn from_client_response_err(err: ProtocolError) -> ProtocolMessage {
pub fn from_client_response_err(err: ServerError) -> ProtocolMessage {
let res: ClientResponse = err.into();
res.into()
}

@ -28,6 +28,7 @@ rand = { version = "0.7", features = ["getrandom"] }
serde = { version = "1.0.142", features = ["derive"] }
serde_bare = "0.5.0"
serde_bytes = "0.11.7"
num_enum = "0.5.7"
fastbloom-rs = "0.5.3"
hex = "0.4.3"
futures = "0.3.24"

@ -30,7 +30,7 @@ impl BranchV0 {
metadata: Vec<u8>,
) -> BranchV0 {
let topic_privkey: Vec<u8> = vec![];
//TODO: topic_privkey is topic_priv encrypted with RepoWriteCapSecret, TopicId, BranchId
//TODO: use encrypt_topic_priv_key
let topic = topic_priv.to_pub();
BranchV0 {
id,
@ -88,6 +88,7 @@ impl Branch {
}
/// Branch sync request from another peer
///
/// `target_heads` represents the list of heads the requester would like to reach. this list should not be empty.
/// if the requester doesn't know what to reach, the responder should fill this list with their own current local head.
/// `known_heads` represents the list of current heads at the requester replica at the moment of request.
@ -180,21 +181,6 @@ mod test {
//use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Membership};
// struct Test<'a> {
// storage: Box<dyn BlockStorage + Send + Sync + 'a>,
// }
// impl<'a> Test<'a> {
// fn storage(s: impl BlockStorage + 'a) -> Self {
// Test {
// storage: Box::new(s),
// }
// }
// fn s(&self) -> &Box<dyn BlockStorage + Send + Sync + 'a> {
// &self.storage
// }
// }
use crate::branch::*;
use crate::repo::Repo;

@ -1433,21 +1433,6 @@ mod test {
use crate::commit::*;
use crate::log::*;
// struct Test<'a> {
// storage: Box<dyn BlockStorage + Send + Sync + 'a>,
// }
// impl<'a> Test<'a> {
// fn storage(s: impl BlockStorage + 'a) -> Self {
// Test {
// storage: Box::new(s),
// }
// }
// fn s(&self) -> &Box<dyn BlockStorage + Send + Sync + 'a> {
// &self.storage
// }
// }
fn test_commit_header_ref_content_fits(
obj_refs: Vec<BlockRef>,
metadata_size: usize,

@ -10,6 +10,8 @@
//! Errors
use crate::commit::{CommitLoadError, CommitVerifyError};
use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive;
use crate::types::BlockId;
use core::fmt;
@ -22,6 +24,8 @@ pub enum NgError {
IncompleteSignature,
SerializationError,
EncryptionError,
InvalidValue,
ConnectionNotFound,
InvalidKey,
InvalidInvitation,
InvalidCreateAccount,
@ -51,8 +55,11 @@ pub enum NgError {
UserNotFound,
TopicNotFound,
NotConnected,
ProtocolError,
ActorError,
ProtocolError(ProtocolError),
ServerError(ServerError),
InvalidResponse,
NotAServerError,
}
impl Error for NgError {}
@ -185,3 +192,172 @@ impl From<serde_bare::error::Error> for StorageError {
StorageError::SerializationError
}
}
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum ServerError {
Ok = 0,
PartialContent,
EndOfStream,
False,
SequenceMismatch,
FileError,
RepoAlreadyOpened,
}
impl ServerError {
pub fn is_stream(&self) -> bool {
*self == ServerError::PartialContent || *self == ServerError::EndOfStream
}
pub fn is_err(&self) -> bool {
*self != ServerError::Ok
}
}
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum NetError {
DirectionAlreadySet = 1,
WsError,
IoError,
ConnectionError,
SerializationError,
ProtocolError,
AccessDenied,
InternalError,
PeerAlreadyConnected,
Closing,
} //MAX 50 NetErrors
impl Error for NetError {}
impl fmt::Display for NetError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum ProtocolError {
NoError = 0,
IoError,
WsError,
ActorError,
InvalidState,
SignatureError,
InvalidSignature,
SerializationError,
AccessDenied,
InvitationRequired,
BrokerError,
NotFound,
MissingBlocks,
ObjectParseError,
InvalidValue,
AlreadyExists,
RepoIdRequired,
ConnectionError,
Timeout,
Expired,
PeerAlreadyConnected,
OtherError,
NetError,
StorageError,
ServerError,
Closing,
FsmNotReady,
MustBeEncrypted,
NoiseHandshakeFailed,
DecryptionError,
EncryptionError,
WhereIsTheMagic,
InvalidNonce,
} //MAX 949 ProtocolErrors
impl From<NetError> for ProtocolError {
fn from(e: NetError) -> Self {
match e {
NetError::IoError => ProtocolError::IoError,
NetError::WsError => ProtocolError::WsError,
NetError::ConnectionError => ProtocolError::ConnectionError,
NetError::SerializationError => ProtocolError::SerializationError,
NetError::ProtocolError => ProtocolError::OtherError,
NetError::AccessDenied => ProtocolError::AccessDenied,
NetError::PeerAlreadyConnected => ProtocolError::PeerAlreadyConnected,
NetError::Closing => ProtocolError::Closing,
_ => ProtocolError::NetError,
}
}
}
impl From<StorageError> for ProtocolError {
fn from(e: StorageError) -> Self {
match e {
StorageError::NotFound => ProtocolError::NotFound,
StorageError::InvalidValue => ProtocolError::InvalidValue,
StorageError::BackendError => ProtocolError::StorageError,
StorageError::SerializationError => ProtocolError::SerializationError,
StorageError::AlreadyExists => ProtocolError::AlreadyExists,
_ => ProtocolError::StorageError,
}
}
}
impl From<ProtocolError> for NgError {
fn from(e: ProtocolError) -> Self {
NgError::ProtocolError(e)
}
}
impl From<ServerError> for NgError {
fn from(e: ServerError) -> Self {
NgError::ServerError(e)
}
}
impl ProtocolError {
pub fn is_err(&self) -> bool {
*self != ProtocolError::NoError
}
}
impl Error for ProtocolError {}
impl fmt::Display for ProtocolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<NgError> for ProtocolError {
fn from(e: NgError) -> Self {
match e {
NgError::InvalidSignature => ProtocolError::InvalidSignature,
NgError::SerializationError => ProtocolError::SerializationError,
_ => ProtocolError::OtherError,
}
}
}
impl From<ObjectParseError> for ProtocolError {
fn from(_e: ObjectParseError) -> Self {
ProtocolError::ObjectParseError
}
}
impl From<serde_bare::error::Error> for ProtocolError {
fn from(_e: serde_bare::error::Error) -> Self {
ProtocolError::SerializationError
}
}
impl From<serde_bare::error::Error> for NetError {
fn from(_e: serde_bare::error::Error) -> Self {
NetError::SerializationError
}
}

@ -717,21 +717,6 @@ mod test {
use std::io::BufReader;
use std::io::Read;
// struct Test<'a> {
// storage: Box<dyn BlockStorage + Send + Sync + 'a>,
// }
// impl<'a> Test<'a> {
// fn storage(s: impl BlockStorage + 'a) -> Self {
// Test {
// storage: Box::new(s),
// }
// }
// fn s(&self) -> &Box<dyn BlockStorage + Send + Sync + 'a> {
// &self.store
// }
// }
/// Checks that a content that does fit in one block, creates an arity of 0
#[test]
pub fn test_depth_0() {

@ -85,6 +85,8 @@ pub struct BranchInfo {
pub topic_priv_key: Option<BranchWriteCapSecret>,
pub read_cap: ReadCap,
pub current_heads: Vec<ObjectRef>,
}
/// In memory Repository representation. With helper functions that access the underlying UserStore and keeps proxy of the values
@ -213,6 +215,15 @@ impl Repo {
None
}
pub fn user_branch(&self) -> Option<&BranchInfo> {
for (_, branch) in self.branches.iter() {
if branch.branch_type == BranchType::User {
return Some(branch);
}
}
None
}
pub fn overlay_branch_read_cap(&self) -> Option<&ReadCap> {
match self.overlay_branch() {
Some(bi) => Some(&bi.read_cap),

@ -35,6 +35,17 @@ pub struct Store {
storage: Arc<RwLock<dyn BlockStorage + Send + Sync>>,
}
impl From<&Store> for StoreUpdate {
fn from(s: &Store) -> StoreUpdate {
StoreUpdate::V0(StoreUpdateV0 {
store: s.store_repo,
store_read_cap: s.store_readcap.clone(),
overlay_branch_read_cap: s.store_overlay_branch_readcap.clone(),
metadata: vec![],
})
}
}
impl fmt::Debug for Store {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Store.\nstore_repo {:?}", self.store_repo)?;
@ -83,6 +94,11 @@ impl Store {
.get(&self.overlay_id, id)
}
/// fetch a block from broker or core overlay
pub async fn fetch(&self, id: &BlockId) -> Result<Block, StorageError> {
todo!();
}
/// Save a block to the storage.
pub fn put(&self, block: &Block) -> Result<BlockId, StorageError> {
self.storage
@ -107,18 +123,126 @@ impl Store {
.len()
}
/// returns the (branch_commit, add_branch_commit, branch_info)
fn create_branch(
&self,
branch_type: BranchType,
creator: &UserId,
creator_priv_key: &PrivKey,
repo_pub_key: BranchId,
repository_commit_ref: ObjectRef,
root_branch_readcap_id: ObjectId,
repo_write_cap_secret: &RepoWriteCapSecret,
add_branch_deps: Vec<ObjectRef>,
add_branch_acks: Vec<ObjectRef>,
) -> Result<(Commit, Commit, BranchInfo), NgError> {
let (branch_priv_key, branch_pub_key) = generate_keypair();
let (branch_topic_priv_key, branch_topic_pub_key) = generate_keypair();
let branch_commit_body = CommitBody::V0(CommitBodyV0::Branch(Branch::V0(BranchV0 {
id: branch_pub_key,
content_type: BranchContentType::None,
repo: repository_commit_ref,
root_branch_readcap_id,
topic: branch_topic_pub_key,
topic_privkey: Branch::encrypt_topic_priv_key(
&branch_topic_priv_key,
branch_topic_pub_key,
branch_pub_key,
repo_write_cap_secret,
),
metadata: vec![],
})));
let branch_commit = Commit::new_with_body_acks_deps_and_save(
&branch_priv_key,
&branch_pub_key,
branch_pub_key,
QuorumType::NoSigning,
vec![],
vec![],
branch_commit_body,
self,
)?;
let branch_read_cap = branch_commit.reference().unwrap();
log_debug!("{:?} BRANCH COMMIT {}", branch_type, branch_commit);
// creating the AddBranch commit (on root_branch), deps to the RootBranch commit
// author is the owner
let add_branch_commit_body =
CommitBody::V0(CommitBodyV0::AddBranch(AddBranch::V0(AddBranchV0 {
branch_type: branch_type.clone(),
topic_id: branch_topic_pub_key,
branch_read_cap: branch_read_cap.clone(),
})));
let add_branch_commit = Commit::new_with_body_acks_deps_and_save(
creator_priv_key,
creator,
repo_pub_key,
QuorumType::Owners,
add_branch_deps,
add_branch_acks,
add_branch_commit_body,
self,
)?;
log_debug!(
"ADD_BRANCH {:?} BRANCH COMMIT {}",
&branch_type,
add_branch_commit
);
let branch_info = BranchInfo {
id: branch_pub_key,
branch_type,
topic: branch_topic_pub_key,
topic_priv_key: Some(branch_topic_priv_key),
read_cap: branch_read_cap,
current_heads: vec![],
};
Ok((branch_commit, add_branch_commit, branch_info))
}
pub fn create_repo_default(
self: Arc<Self>,
creator: &UserId,
creator_priv_key: &PrivKey,
repo_write_cap_secret: SymKey,
is_store: bool,
is_private_store: bool,
) -> Result<(Repo, Vec<(Commit, Vec<Digest>)>), NgError> {
let (repo_priv_key, repo_pub_key) = generate_keypair();
self.create_repo_with_keys(
creator,
creator_priv_key,
repo_priv_key,
repo_pub_key,
repo_write_cap_secret,
is_store,
is_private_store,
)
}
pub fn create_repo_with_keys(
self: Arc<Self>,
creator: &UserId,
creator_priv_key: &PrivKey,
repo_priv_key: PrivKey,
repo_pub_key: PubKey,
repo_write_cap_secret: SymKey,
is_store: bool,
is_private_store: bool,
) -> Result<(Repo, Vec<(Commit, Vec<Digest>)>), NgError> {
let mut events = Vec::with_capacity(6);
// creating the Repository commit
let (repo_priv_key, repo_pub_key) = generate_keypair();
let repository = Repository::V0(RepositoryV0 {
id: repo_pub_key,
verification_program: vec![],
@ -164,6 +288,7 @@ impl Store {
quorum: None,
reconciliation_interval: RelTime::None,
owners: vec![creator.clone()],
//TODO: add crypto_box of repo_write_cap_secret for creator
metadata: vec![],
})));
@ -189,70 +314,80 @@ impl Store {
// creating the main branch
let (main_branch_priv_key, main_branch_pub_key) = generate_keypair();
let (main_branch_topic_priv_key, main_branch_topic_pub_key) = generate_keypair();
let main_branch_commit_body = CommitBody::V0(CommitBodyV0::Branch(Branch::V0(BranchV0 {
id: main_branch_pub_key,
content_type: BranchContentType::None,
repo: repository_commit_ref.clone(),
root_branch_readcap_id,
topic: main_branch_topic_pub_key,
topic_privkey: Branch::encrypt_topic_priv_key(
&main_branch_topic_priv_key,
main_branch_topic_pub_key,
main_branch_pub_key,
let (main_branch_commit, main_add_branch_commit, mut main_branch_info) =
self.as_ref().create_branch(
BranchType::Main,
creator,
creator_priv_key,
repo_pub_key,
repository_commit_ref.clone(),
root_branch_readcap_id,
&repo_write_cap_secret,
),
metadata: vec![],
})));
let main_branch_commit = Commit::new_with_body_acks_deps_and_save(
&main_branch_priv_key,
&main_branch_pub_key,
main_branch_pub_key,
QuorumType::NoSigning,
vec![],
vec![],
main_branch_commit_body,
&self,
)?;
let branch_read_cap = main_branch_commit.reference().unwrap();
let branch_read_cap_id = branch_read_cap.id;
log_debug!("MAIN BRANCH COMMIT {}", main_branch_commit);
// adding the event for the Branch commit
vec![root_branch_readcap.clone()],
vec![],
)?;
events.push((main_branch_commit, vec![]));
// creating the AddBranch commit (on root_branch), deps to the RootBranch commit
// author is the owner
let add_branch_commit_body =
CommitBody::V0(CommitBodyV0::AddBranch(AddBranch::V0(AddBranchV0 {
branch_type: BranchType::Main,
topic_id: main_branch_topic_pub_key,
branch_read_cap: branch_read_cap.clone(),
})));
let add_branch_commit = Commit::new_with_body_acks_deps_and_save(
creator_priv_key,
creator,
repo_pub_key,
QuorumType::Owners,
vec![root_branch_readcap.clone()],
vec![],
add_branch_commit_body,
&self,
)?;
log_debug!("ADD_BRANCH COMMIT {}", add_branch_commit);
// TODO: optional AddMember and AddPermission, that should be added as deps to the SynSignature below (and to the commits of the SignatureContent)
// using the creator as author (and incrementing their peer's seq_num)
let extra_branches = if is_store {
// creating the store branch
let (store_branch_commit, store_add_branch_commit, store_branch_info) =
self.as_ref().create_branch(
BranchType::Store,
creator,
creator_priv_key,
repo_pub_key,
repository_commit_ref.clone(),
root_branch_readcap_id,
&repo_write_cap_secret,
vec![main_add_branch_commit.reference().unwrap()],
vec![],
)?;
events.push((store_branch_commit, vec![]));
// creating the overlay or user branch
let (
overlay_or_user_branch_commit,
overlay_or_user_add_branch_commit,
overlay_or_user_branch_info,
) = self.as_ref().create_branch(
if is_private_store {
BranchType::User
} else {
BranchType::Overlay
},
creator,
creator_priv_key,
repo_pub_key,
repository_commit_ref.clone(),
root_branch_readcap_id,
&repo_write_cap_secret,
vec![store_add_branch_commit.reference().unwrap()],
vec![],
)?;
events.push((overlay_or_user_branch_commit, vec![]));
Some((
store_add_branch_commit,
store_branch_info,
overlay_or_user_add_branch_commit,
overlay_or_user_branch_info,
))
} else {
None
};
let sync_sign_deps = if is_store {
extra_branches.as_ref().unwrap().2.reference().unwrap()
} else {
main_add_branch_commit.reference().unwrap()
};
// preparing the threshold keys for the unique owner
let mut rng = rand::thread_rng();
let sk_set = SecretKeySet::random(0, &mut rng);
@ -263,12 +398,24 @@ impl Store {
// creating signature for RootBranch, AddBranch and Branch commits
// signed with owner threshold signature (threshold = 0)
let mut signed_commits = vec![
root_branch_readcap_id,
main_add_branch_commit.id().unwrap(),
main_branch_info.read_cap.id,
];
if let Some((store_add_branch, store_branch, oou_add_branch, oou_branch)) = &extra_branches
{
signed_commits.append(&mut vec![
store_add_branch.id().unwrap(),
store_branch.read_cap.id,
oou_add_branch.id().unwrap(),
oou_branch.read_cap.id,
]);
}
let signature_content = SignatureContent::V0(SignatureContentV0 {
commits: vec![
root_branch_readcap_id,
add_branch_commit.id().unwrap(),
branch_read_cap_id,
],
commits: signed_commits,
});
let signature_content_ser = serde_bare::to_vec(&signature_content).unwrap();
@ -277,7 +424,7 @@ impl Store {
.combine_signatures([(0, &sig_share)])
.map_err(|_| NgError::IncompleteSignature)?;
let threshold_sig = ThresholdSignatureV0::Owners((sig));
let threshold_sig = ThresholdSignatureV0::Owners(sig);
// creating root certificate of the repo
@ -304,7 +451,7 @@ impl Store {
0,
&self,
);
let mut cert_obj_blocks = cert_object.save(&self)?;
let cert_obj_blocks = cert_object.save(&self)?;
// finally getting the signature:
@ -321,7 +468,7 @@ impl Store {
0,
&self,
);
let mut sig_obj_blocks = sig_object.save(&self)?;
let sig_obj_blocks = sig_object.save(&self)?;
// keeping the Secret Key Share of the owner
let signer_cap = SignerCap {
@ -342,46 +489,65 @@ impl Store {
creator,
repo_pub_key,
QuorumType::IamTheSignature,
vec![add_branch_commit.reference().unwrap()],
vec![sync_sign_deps],
vec![root_branch_readcap.clone()],
sync_sig_commit_body.clone(),
&self,
)?;
let mut branches = vec![(main_branch_info.id, main_branch_info)];
// adding the event for the sync_sig_on_root_branch_commit
let mut additional_blocks = Vec::with_capacity(
cert_obj_blocks.len() + sig_obj_blocks.len() + add_branch_commit.blocks().len(),
cert_obj_blocks.len() + sig_obj_blocks.len() + main_add_branch_commit.blocks().len(),
);
additional_blocks.extend(cert_obj_blocks.iter());
additional_blocks.extend(sig_obj_blocks.iter());
additional_blocks.extend(add_branch_commit.blocks().iter());
additional_blocks.extend(main_add_branch_commit.blocks().iter());
if let Some((store_add_branch, store_branch_info, oou_add_branch, oou_branch_info)) =
extra_branches
{
additional_blocks.extend(store_add_branch.blocks().iter());
additional_blocks.extend(oou_add_branch.blocks().iter());
branches.push((store_branch_info.id, store_branch_info));
branches.push((oou_branch_info.id, oou_branch_info));
}
let sync_sig_on_root_branch_commit_ref =
sync_sig_on_root_branch_commit.reference().unwrap();
events.push((sync_sig_on_root_branch_commit, additional_blocks));
// creating the SyncSignature for the main branch with deps to the Branch commit and acks also to this commit as it is its direct causal future.
// creating the SyncSignature for the all branches with deps to the Branch commit and acks also to this commit as it is its direct causal future.
let sync_sig_on_main_branch_commit = Commit::new_with_body_acks_deps_and_save(
creator_priv_key,
creator,
main_branch_pub_key,
QuorumType::IamTheSignature,
vec![branch_read_cap.clone()],
vec![branch_read_cap.clone()],
sync_sig_commit_body,
&self,
)?;
for (branch_id, branch_info) in &mut branches {
let sync_sig_on_branch_commit = Commit::new_with_body_acks_deps_and_save(
creator_priv_key,
creator,
*branch_id,
QuorumType::IamTheSignature,
vec![branch_info.read_cap.clone()],
vec![branch_info.read_cap.clone()],
sync_sig_commit_body.clone(),
&self,
)?;
// adding the event for the sync_sig_on_main_branch_commit
let sync_sig_on_branch_commit_ref = sync_sig_on_branch_commit.reference().unwrap();
let mut additional_blocks =
Vec::with_capacity(cert_obj_blocks.len() + sig_obj_blocks.len());
additional_blocks.append(&mut cert_obj_blocks);
additional_blocks.append(&mut sig_obj_blocks);
// adding the event for the sync_sig_on_branch_commit
events.push((sync_sig_on_main_branch_commit, additional_blocks));
let mut additional_blocks =
Vec::with_capacity(cert_obj_blocks.len() + sig_obj_blocks.len());
additional_blocks.extend(cert_obj_blocks.iter());
additional_blocks.extend(sig_obj_blocks.iter());
// TODO: add the CertificateRefresh event on main branch
events.push((sync_sig_on_branch_commit, additional_blocks));
branch_info.current_heads = vec![sync_sig_on_branch_commit_ref];
// TODO: add the CertificateRefresh event on main branch
}
// preparing the Repo
@ -391,15 +557,10 @@ impl Store {
topic: topic_pub_key,
topic_priv_key: Some(topic_priv_key),
read_cap: root_branch_readcap.clone(),
current_heads: vec![sync_sig_on_root_branch_commit_ref],
};
let main_branch = BranchInfo {
id: main_branch_pub_key.clone(),
branch_type: BranchType::Main,
topic: main_branch_topic_pub_key,
topic_priv_key: Some(main_branch_topic_priv_key),
read_cap: branch_read_cap,
};
branches.push((root_branch.id, root_branch));
let repo = Repo {
id: repo_pub_key,
@ -409,10 +570,7 @@ impl Store {
store: Arc::clone(&self),
read_cap: Some(root_branch_readcap),
write_cap: Some(repo_write_cap_secret),
branches: HashMap::from([
(repo_pub_key, root_branch),
(main_branch_pub_key, main_branch),
]),
branches: branches.into_iter().collect(),
opened_branches: HashMap::new(),
};
@ -434,6 +592,15 @@ impl Store {
}
}
pub fn inner_overlay(&self) -> OverlayId {
self.store_repo
.overlay_id_for_write_purpose(&self.store_overlay_branch_readcap.key)
}
pub fn outer_overlay(&self) -> OverlayId {
self.store_repo.outer_overlay()
}
#[allow(deprecated)]
#[cfg(any(test, feature = "testing"))]
pub fn dummy_public_v0() -> Arc<Self> {

@ -525,9 +525,9 @@ impl fmt::Display for OverlayId {
}
impl OverlayId {
pub fn inner_from_store(store: &Store) -> OverlayId {
Self::inner(store.id(), store.get_store_overlay_branch_readcap_secret())
}
// pub fn inner_from_store(store: &Store) -> OverlayId {
// Self::inner(store.id(), store.get_store_overlay_branch_readcap_secret())
// }
pub fn inner(
store_id: &PubKey,
store_overlay_branch_readcap_secret: &ReadCapSecret,
@ -556,6 +556,13 @@ impl OverlayId {
pub fn nil() -> OverlayId {
OverlayId::Outer(Digest::nil())
}
pub fn is_inner(&self) -> bool {
match self {
Self::Inner(_) => true,
_ => false,
}
}
}
/// List of Store Overlay types
@ -603,7 +610,7 @@ impl StoreOverlay {
| StoreOverlay::V0(StoreOverlayV0::ProtectedStore(id))
| StoreOverlay::V0(StoreOverlayV0::PrivateStore(id))
| StoreOverlay::V0(StoreOverlayV0::Group(id)) => OverlayId::outer(id),
StoreOverlay::V0(StoreOverlayV0::Dialog(d)) => unimplemented!(),
StoreOverlay::V0(StoreOverlayV0::Dialog(d)) => OverlayId::Inner(d.clone()),
StoreOverlay::Own(_) => unimplemented!(),
}
}
@ -619,7 +626,7 @@ impl StoreOverlay {
| StoreOverlay::V0(StoreOverlayV0::Group(id)) => {
OverlayId::inner(id, &store_overlay_branch_readcap_secret)
}
StoreOverlay::V0(StoreOverlayV0::Dialog(d)) => unimplemented!(),
StoreOverlay::V0(StoreOverlayV0::Dialog(d)) => OverlayId::Inner(d.clone()),
StoreOverlay::Own(_) => unimplemented!(),
}
}
@ -660,6 +667,12 @@ pub enum StoreRepo {
V0(StoreRepoV0),
}
impl fmt::Display for StoreRepo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "StoreRepo V0 {}", self.repo_id())
}
}
impl StoreRepo {
pub fn repo_id(&self) -> &RepoId {
match self {
@ -683,10 +696,14 @@ impl StoreRepo {
StoreRepo::V0(StoreRepoV0::PublicStore(repo_pubkey))
}
pub fn outer_overlay(&self) -> OverlayId {
self.overlay_id_for_read_purpose()
}
pub fn overlay_id_for_read_purpose(&self) -> OverlayId {
//let store_overlay: StoreOverlay = self.into();
//store_overlay.overlay_id_for_read_purpose()
OverlayId::outer(self.repo_id())
let store_overlay: StoreOverlay = self.into();
store_overlay.overlay_id_for_read_purpose()
//OverlayId::outer(self.repo_id())
}
pub fn is_private(&self) -> bool {
@ -722,6 +739,21 @@ impl StoreRepo {
Self::V0(StoreRepoV0::Dialog(d)) => OverlayId::Inner(d.1.clone()),
}
}
pub fn overlay_id_for_write_purpose(
&self,
store_overlay_branch_readcap_secret: &ReadCapSecret,
) -> OverlayId {
match self {
Self::V0(StoreRepoV0::PublicStore(id))
| Self::V0(StoreRepoV0::ProtectedStore(id))
| Self::V0(StoreRepoV0::Group(id))
| Self::V0(StoreRepoV0::PrivateStore(id)) => {
OverlayId::inner(id, store_overlay_branch_readcap_secret)
}
Self::V0(StoreRepoV0::Dialog(d)) => OverlayId::Inner(d.1.clone()),
}
}
}
/// Site type
@ -1658,7 +1690,7 @@ pub struct StoreUpdateV0 {
pub store_read_cap: ReadCap,
pub inner_overlay_read_cap: ReadCap,
pub overlay_branch_read_cap: ReadCap,
/// Metadata
#[serde(with = "serde_bytes")]

@ -569,12 +569,20 @@ impl RocksdbKCVStorage {
let res = iter.next();
match res {
Some(Ok(val)) => {
//log_info!("{:?} {:?}", val.0, val.1);
match compare(&val.0, property_end.as_slice()) {
std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {
if suffix.is_some() {
if val.0.len() < (key_size + 2)
if val.0.len() < key_size + 2
|| val.0[1 + key_size] != suffix.unwrap()
{
// log_info!(
// "SKIPPED cause suffix {} {} {} {}",
// val.0.len(),
// key_size + 2,
// val.0[1 + key_size],
// suffix.unwrap()
// );
continue;
}
// } else if val.0.len() > (key_size + 1) {
@ -582,7 +590,10 @@ impl RocksdbKCVStorage {
}
vector.push((val.0.to_vec(), val.1.to_vec()));
}
_ => {} //,
_ => {
//log_info!("SKIPPED cause above END");
break;
} //,
}
}
Some(Err(_e)) => return Err(StorageError::BackendError),

@ -14,6 +14,7 @@
use crate::types::*;
use crate::verifier::Verifier;
use ng_repo::errors::NgError;
use ng_repo::store::*;
use ng_repo::types::*;
use ng_repo::utils::{generate_keypair, sign, verify};
use serde::{Deserialize, Serialize};
@ -97,28 +98,123 @@ impl SiteV0 {
let protected_store = Self::site_store_to_store_repo(&protected);
let private_store = Self::site_store_to_store_repo(&private);
verifier.reserve_more(18)?;
verifier.reserve_more(33)?;
let mut signer_caps = Vec::with_capacity(3);
let public_repo = verifier
.new_store_default(&site_pubkey, &user_priv_key, &public_store, false)
.new_store_default(
&site_pubkey,
&user_priv_key,
public_store_privkey,
&public_store,
false,
)
.await?;
let public_store_update: StoreUpdate = public_repo.store.as_ref().into();
signer_caps.push(public_repo.signer.to_owned().unwrap());
let protected_repo = verifier
.new_store_default(&site_pubkey, &user_priv_key, &protected_store, false)
.new_store_default(
&site_pubkey,
&user_priv_key,
protected_store_privkey,
&protected_store,
false,
)
.await?;
let protected_store_update: StoreUpdate = protected_repo.store.as_ref().into();
signer_caps.push(protected_repo.signer.to_owned().unwrap());
let private_repo = verifier
.new_store_default(&site_pubkey, &user_priv_key, &private_store, true)
.new_store_default(
&site_pubkey,
&user_priv_key,
private_store_privkey,
&private_store,
true,
)
.await?;
// TODO: create user branch
// TODO: add the 2 commits in user branch about StoreUpdate of public and protected stores.
signer_caps.push(private_repo.signer.to_owned().unwrap());
let user_branch = private_repo.user_branch().unwrap();
// Creating the StoreUpdate about public store.
let public_store_update_commit_body =
CommitBody::V0(CommitBodyV0::StoreUpdate(public_store_update));
let public_store_update_commit = Commit::new_with_body_acks_deps_and_save(
&user_priv_key,
&site_pubkey,
user_branch.id,
QuorumType::NoSigning,
vec![],
user_branch.current_heads.clone(),
public_store_update_commit_body,
&private_repo.store,
)?;
// Creating the StoreUpdate about protected store.
let protected_store_update_commit_body =
CommitBody::V0(CommitBodyV0::StoreUpdate(protected_store_update));
let protected_store_update_commit = Commit::new_with_body_acks_deps_and_save(
&user_priv_key,
&site_pubkey,
user_branch.id,
QuorumType::NoSigning,
vec![],
vec![public_store_update_commit.reference().unwrap()],
protected_store_update_commit_body,
&private_repo.store,
)?;
let mut current_head = protected_store_update_commit.reference().unwrap();
let private_repo_id = private_repo.id;
let private_store_repo = private_repo.store.get_store_repo().clone();
let private_repo_read_cap = private_repo.read_cap.to_owned().unwrap();
let user_branch_id = user_branch.id;
// Creating the AddSignerCap for each store
let mut commits = Vec::with_capacity(5);
commits.push((public_store_update_commit, vec![]));
commits.push((protected_store_update_commit, vec![]));
for cap in signer_caps {
let add_signer_cap_commit_body = CommitBody::V0(CommitBodyV0::AddSignerCap(
AddSignerCap::V0(AddSignerCapV0 {
cap,
metadata: vec![],
}),
));
let add_signer_cap_commit = Commit::new_with_body_acks_deps_and_save(
&user_priv_key,
&site_pubkey,
user_branch.id,
QuorumType::NoSigning,
vec![],
vec![current_head],
add_signer_cap_commit_body,
&private_repo.store,
)?;
current_head = add_signer_cap_commit.reference().unwrap();
commits.push((add_signer_cap_commit, vec![]));
}
// update the current_heads
verifier.update_current_heads(&private_repo_id, &user_branch_id, vec![current_head])?;
// sending the 5 events
verifier
.new_events(commits, private_repo_id, &private_store_repo)
.await?;
Ok(Self {
site_type: SiteType::Individual((
user_priv_key,
private_repo.read_cap.to_owned().unwrap(),
)),
site_type: SiteType::Individual((user_priv_key, private_repo_read_cap)),
id: site_pubkey,
name: site_name,
public,

@ -21,13 +21,12 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc};
use ng_net::{
connection::NoiseFSM,
errors::ProtocolError,
types::*,
utils::{Receiver, Sender},
};
use ng_repo::{
block_storage::BlockStorage,
errors::{NgError, StorageError},
errors::{NgError, ProtocolError, StorageError},
file::RandomAccessFile,
store::Store,
types::*,

@ -15,9 +15,9 @@ use std::hash::Hash;
use std::hash::Hasher;
use std::time::SystemTime;
use ng_net::errors::ProtocolError;
use ng_net::types::*;
use ng_repo::block_storage::BlockStorage;
use ng_repo::errors::ProtocolError;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::repo::BranchInfo;
@ -27,11 +27,13 @@ use ng_repo::types::BranchId;
use ng_repo::types::BranchType;
use ng_repo::types::BranchWriteCapSecret;
use ng_repo::types::ObjectId;
use ng_repo::types::ObjectRef;
use ng_repo::types::ReadCap;
use ng_repo::types::RepoId;
use ng_repo::types::SymKey;
use ng_repo::types::Timestamp;
use ng_repo::types::TopicId;
use serde_bare::from_slice;
use serde_bare::to_vec;
use super::prop;
@ -80,6 +82,7 @@ impl<'a> BranchStorage<'a> {
&info.branch_type,
&info.topic,
info.topic_priv_key.as_ref(),
&info.current_heads,
storage,
)
}
@ -90,6 +93,7 @@ impl<'a> BranchStorage<'a> {
branch_type: &BranchType,
topic: &TopicId,
publisher: Option<&BranchWriteCapSecret>,
current_heads: &Vec<ObjectRef>,
storage: &'a dyn KCVStorage,
) -> Result<BranchStorage<'a>, StorageError> {
let bs = BranchStorage {
@ -112,6 +116,13 @@ impl<'a> BranchStorage<'a> {
let value = to_vec(privkey)?;
tx.put(Self::PREFIX, &id_ser, Some(Self::PUBLISHER), &value, &None)?;
}
for head in current_heads {
let mut head_ser = to_vec(head)?;
let mut key = Vec::with_capacity(id_ser.len() + head_ser.len());
key.append(&mut id_ser.clone());
key.append(&mut head_ser);
tx.put(Self::PREFIX_HEADS, &key, None, &vec![], &None)?;
}
Ok(())
})?;
Ok(bs)
@ -131,10 +142,34 @@ impl<'a> BranchStorage<'a> {
read_cap: prop(Self::READ_CAP, &props)?,
topic: prop(Self::TOPIC, &props)?,
topic_priv_key: prop(Self::PUBLISHER, &props).ok(),
current_heads: Self::get_all_heads(id, storage)?,
};
Ok(bs)
}
pub fn get_all_heads(
id: &BranchId,
storage: &'a dyn KCVStorage,
) -> Result<Vec<ObjectRef>, StorageError> {
let size = to_vec(&ObjectRef::nil())?.len();
let key_prefix = to_vec(id).unwrap();
let mut res: Vec<ObjectRef> = vec![];
let total_size = key_prefix.len() + size;
for head in storage.get_all_keys_and_values(
Self::PREFIX_HEADS,
total_size,
key_prefix,
None,
&None,
)? {
if head.0.len() == total_size + 1 {
let head: ObjectRef = from_slice(&head.0[1..head.0.len()])?;
res.push(head);
}
}
Ok(res)
}
pub fn exists(&self) -> bool {
self.storage
.get(
@ -153,7 +188,7 @@ impl<'a> BranchStorage<'a> {
self.storage.write_transaction(&mut |tx| {
let key = &to_vec(&self.id)?;
tx.del_all(Self::PREFIX, key, &Self::ALL_PROPERTIES, &None)?;
let size = to_vec(&ObjectId::nil())?.len();
let size = to_vec(&ObjectRef::nil())?.len();
tx.del_all_values(Self::PREFIX_HEADS, key, size, None, &None)?;
Ok(())
})

@ -17,11 +17,12 @@ use std::hash::Hasher;
use std::time::SystemTime;
use either::{Either, Left, Right};
use ng_net::errors::ProtocolError;
use ng_net::types::*;
use ng_repo::block_storage::BlockStorage;
use ng_repo::errors::ProtocolError;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::log::*;
use ng_repo::repo::BranchInfo;
use ng_repo::repo::Repo;
use ng_repo::store::Store;
@ -31,6 +32,7 @@ use ng_repo::types::ReadCap;
use ng_repo::types::RepoId;
use ng_repo::types::RepoWriteCapSecret;
use ng_repo::types::Repository;
use ng_repo::types::SignerCap;
use ng_repo::types::StoreRepo;
use ng_repo::types::SymKey;
use ng_repo::types::Timestamp;
@ -52,8 +54,8 @@ impl<'a> RepoStorage<'a> {
const PREFIX: u8 = b'r';
// repo properties suffixes
const SIGNER_CAP_OWNER: u8 = b'a';
const SIGNER_CAP_PARTIAL: u8 = b'b';
const SIGNER_CAP: u8 = b'a';
//const SIGNER_CAP_PARTIAL: u8 = b'b';
const CHAT_BRANCH: u8 = b'c';
const DEFINITION: u8 = b'd';
const STORE_BRANCH: u8 = b'e';
@ -65,13 +67,13 @@ impl<'a> RepoStorage<'a> {
const QUORUM: u8 = b'q';
const READ_CAP: u8 = b'r';
const STORE_REPO: u8 = b's';
const SIGNER_CAP_TOTAL: u8 = b't';
//const SIGNER_CAP_TOTAL: u8 = b't';
const USER_BRANCH: u8 = b'u';
const WRITE_CAP_SECRET: u8 = b'w';
const ALL_PROPERTIES: [u8; 16] = [
Self::SIGNER_CAP_OWNER,
Self::SIGNER_CAP_PARTIAL,
const ALL_PROPERTIES: [u8; 14] = [
Self::SIGNER_CAP,
//Self::SIGNER_CAP_PARTIAL,
Self::CHAT_BRANCH,
Self::DEFINITION,
Self::STORE_BRANCH,
@ -83,7 +85,7 @@ impl<'a> RepoStorage<'a> {
Self::QUORUM,
Self::READ_CAP,
Self::STORE_REPO,
Self::SIGNER_CAP_TOTAL,
//Self::SIGNER_CAP_TOTAL,
Self::USER_BRANCH,
Self::WRITE_CAP_SECRET,
];
@ -111,6 +113,7 @@ impl<'a> RepoStorage<'a> {
&repo.id,
repo.read_cap.as_ref().unwrap(),
repo.write_cap.as_ref(),
repo.signer.as_ref(),
repo.store.get_store_repo(),
&repo.repo_def,
&repo.branches,
@ -118,11 +121,11 @@ impl<'a> RepoStorage<'a> {
)
}
// TODO: signers
pub fn create(
id: &RepoId,
read_cap: &ReadCap,
write_cap: Option<&RepoWriteCapSecret>,
signer_cap: Option<&SignerCap>,
store_repo: &StoreRepo,
repo_def: &Repository,
branches: &HashMap<BranchId, BranchInfo>,
@ -136,6 +139,16 @@ impl<'a> RepoStorage<'a> {
return Err(StorageError::AlreadyExists);
}
let mut store_branch = None;
// FIXME: use the same transaction for all branches and the repo
for branch in branches.values() {
BranchStorage::create_from_info(branch, storage)?;
if branch.branch_type == BranchType::Store {
store_branch = Some(branch.id);
}
}
storage.write_transaction(&mut |tx| {
let id_ser = to_vec(&id)?;
let value = to_vec(read_cap)?;
@ -154,13 +167,30 @@ impl<'a> RepoStorage<'a> {
&None,
)?;
}
if let Some(sb) = store_branch {
let value = to_vec(&sb)?;
tx.put(
Self::PREFIX,
&id_ser,
Some(Self::STORE_BRANCH),
&value,
&None,
)?;
}
if let Some(sc) = signer_cap {
let value = to_vec(sc)?;
tx.put(Self::PREFIX, &id_ser, Some(Self::SIGNER_CAP), &value, &None)?;
}
for branch in branches.keys() {
let mut branch_ser = to_vec(branch)?;
let mut key = Vec::with_capacity(id_ser.len() + branch_ser.len());
key.append(&mut id_ser.clone());
key.append(&mut branch_ser);
tx.put(Self::PREFIX_BRANCHES, &key, None, &vec![], &None)?;
}
Ok(())
})?;
for branch in branches.values() {
BranchStorage::create_from_info(branch, storage)?;
}
Ok(repo)
}
@ -169,6 +199,7 @@ impl<'a> RepoStorage<'a> {
store: Either<Arc<Store>, Arc<RwLock<dyn BlockStorage + Send + Sync>>>,
storage: &'a dyn KCVStorage,
) -> Result<Repo, StorageError> {
//("LOADING repo {}", id);
let branch_ids = Self::get_all_branches(id, storage)?;
let mut branches = HashMap::new();
let mut overlay_branch_read_cap = None;
@ -177,6 +208,8 @@ impl<'a> RepoStorage<'a> {
if info.branch_type == BranchType::Overlay {
overlay_branch_read_cap = Some(info.read_cap.clone());
}
//log_info!("LOADING BRANCH INFO {}", branch);
//log_info!("TOPIC {}", info.topic);
let _ = branches.insert(branch, info);
}
@ -214,8 +247,7 @@ impl<'a> RepoStorage<'a> {
repo_def: prop(Self::DEFINITION, &props)?,
read_cap: prop(Self::READ_CAP, &props)?,
write_cap: prop(Self::WRITE_CAP_SECRET, &props).ok(),
//TODO: signer
signer: None,
signer: prop(Self::SIGNER_CAP, &props).ok(),
//TODO: members
members: HashMap::new(),
branches,
@ -246,7 +278,8 @@ impl<'a> RepoStorage<'a> {
let size = to_vec(&BranchId::nil())?.len();
let key_prefix = to_vec(id).unwrap();
let mut res: Vec<BranchId> = vec![];
let total_size = key_prefix.len() + size;
let key_prefix_len = key_prefix.len();
let total_size = key_prefix_len + size;
for branch in storage.get_all_keys_and_values(
Self::PREFIX_BRANCHES,
total_size,
@ -255,7 +288,8 @@ impl<'a> RepoStorage<'a> {
&None,
)? {
if branch.0.len() == total_size + 1 {
let branch_id: BranchId = from_slice(&branch.0[1..branch.0.len()])?;
let branch_id: BranchId =
from_slice(&branch.0[1 + key_prefix_len..total_size + 1])?;
res.push(branch_id);
}
}
@ -265,6 +299,7 @@ impl<'a> RepoStorage<'a> {
pub fn get_all_store_and_repo_ids(
storage: &'a dyn KCVStorage,
) -> Result<HashMap<StoreRepo, Vec<RepoId>>, StorageError> {
//log_info!("get_all_store_and_repo_ids");
let mut res = HashMap::new();
let size = to_vec(&RepoId::nil())?.len();
let mut store_ids = HashSet::new();
@ -275,7 +310,8 @@ impl<'a> RepoStorage<'a> {
Some(Self::STORE_BRANCH),
&None,
)? {
let store_id: RepoId = from_slice(&store_id_ser)?;
let store_id: RepoId = from_slice(&store_id_ser[1..1 + size])?;
//log_info!("FOUND store_id {}", store_id);
store_ids.insert(store_id);
}
let mut repo_ids = HashMap::new();
@ -286,7 +322,8 @@ impl<'a> RepoStorage<'a> {
Some(Self::STORE_REPO),
&None,
)? {
let repo_id: RepoId = from_slice(&repo_id_ser)?;
let repo_id: RepoId = from_slice(&repo_id_ser[1..1 + size])?;
//log_info!("FOUND repo_id {}", repo_id);
let store_repo: StoreRepo = from_slice(&store_repo_ser)?;
repo_ids.insert(repo_id, store_repo);
}
@ -294,12 +331,14 @@ impl<'a> RepoStorage<'a> {
for store in store_ids.iter() {
let store_repo = repo_ids.get(store).ok_or(StorageError::NotAStoreRepo)?;
res.insert(*store_repo, vec![]);
//log_info!("INSERTED store_id {}", store);
}
for (repo_id, store_repo) in repo_ids.iter() {
if store_ids.get(repo_id).is_none() {
let repos = res.get_mut(store_repo).ok_or(StorageError::NotFound)?;
repos.push(*repo_id);
//log_info!("INSERTED repo_id {}", repo_id);
}
}

@ -17,7 +17,7 @@ use ng_repo::log::*;
use ng_repo::object::Object;
use ng_repo::{
block_storage::BlockStorage,
errors::{NgError, StorageError},
errors::{NgError, ProtocolError, ServerError, StorageError},
file::RandomAccessFile,
repo::Repo,
store::Store,
@ -40,7 +40,6 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc};
use ng_net::{
connection::NoiseFSM,
errors::ProtocolError,
types::*,
utils::{Receiver, Sender},
};
@ -112,12 +111,19 @@ impl Verifier {
}
pub fn load(&mut self) -> Result<(), NgError> {
// log_info!(
// "SHOULD LOAD? {} {} {}",
// self.is_persistent(),
// self.user_storage.is_some(),
// self.block_storage.is_some()
// );
if self.is_persistent() && self.user_storage.is_some() && self.block_storage.is_some() {
let user_storage = Arc::clone(self.user_storage.as_ref().unwrap());
//log_info!("LOADING ...");
let stores = user_storage.get_all_store_and_repo_ids()?;
for (store, repos) in stores.iter() {
//log_info!("LOADING STORE: {}", store);
let repo = user_storage
.load_store(store, Arc::clone(self.block_storage.as_ref().unwrap()))?;
self.stores.insert(
@ -128,6 +134,7 @@ impl Verifier {
self.add_repo_without_saving(repo);
for repo_id in repos {
//log_info!("LOADING REPO: {}", repo_id);
let repo = user_storage.load_repo(repo_id, Arc::clone(&store))?;
self.add_repo_without_saving(repo);
}
@ -204,15 +211,15 @@ impl Verifier {
pub fn get_repo_mut(
&mut self,
id: RepoId,
id: &RepoId,
store_repo: &StoreRepo,
) -> Result<&mut Repo, NgError> {
let store = self.get_store(store_repo);
let repo_ref = self.repos.get_mut(&id).ok_or(NgError::RepoNotFound);
let repo_ref = self.repos.get_mut(id).ok_or(NgError::RepoNotFound);
// .or_insert_with(|| {
// // load from storage
// Repo {
// id,
// id: *id,
// repo_def: Repository::new(&PubKey::nil(), &vec![]),
// read_cap: None,
// write_cap: None,
@ -240,6 +247,21 @@ impl Verifier {
self.stores.insert(overlay_id, store);
}
pub(crate) fn update_current_heads(
&mut self,
repo_id: &RepoId,
branch_id: &BranchId,
current_heads: Vec<ObjectRef>,
) -> Result<(), NgError> {
let repo = self.repos.get_mut(repo_id).ok_or(NgError::RepoNotFound)?;
let branch = repo
.branches
.get_mut(branch_id)
.ok_or(NgError::BranchNotFound)?;
branch.current_heads = current_heads;
Ok(())
}
pub(crate) async fn new_event(
&mut self,
commit: &Commit,
@ -254,6 +276,19 @@ impl Verifier {
.await
}
pub(crate) async fn new_event_with_repo(
&mut self,
commit: &Commit,
additional_blocks: &Vec<BlockId>,
repo: &Repo,
) -> Result<(), NgError> {
if self.last_seq_num + 1 >= self.max_reserved_seq_num {
self.reserve_more(1)?;
}
self.new_event_with_repo_(commit, additional_blocks, repo)
.await
}
async fn new_event_(
&mut self,
commit: &Commit,
@ -267,7 +302,7 @@ impl Verifier {
let repo = self.get_repo(repo_id, store_repo)?;
let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?;
self.send_or_save_event_to_outbox(event, repo.store.overlay_id)
self.send_or_save_event_to_outbox(event, repo.store.inner_overlay())
.await?;
Ok(())
}
@ -283,7 +318,7 @@ impl Verifier {
let seq_num = self.last_seq_num;
let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?;
self.send_or_save_event_to_outbox(event, repo.store.overlay_id)
self.send_or_save_event_to_outbox(event, repo.store.inner_overlay())
.await?;
Ok(())
}
@ -414,7 +449,7 @@ impl Verifier {
// send the event to the server already
let broker = BROKER.write().await;
let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.as_ref().unwrap().to_owned();
let remote = self.connected_server_id.to_owned().unwrap();
self.send_event(event, &broker, &user, &remote, overlay)
.await?;
} else {
@ -465,6 +500,8 @@ impl Verifier {
remote: &DirectPeerId,
overlay: OverlayId,
) -> Result<(), NgError> {
assert!(overlay.is_inner());
//log_info!("searching for topic {} {}", overlay, event.topic_id());
let (repo_id, branch_id) = self
.topics
.get(&(overlay, *event.topic_id()))
@ -484,24 +521,28 @@ impl Verifier {
.request::<RepoPinStatusReq, RepoPinStatus>(user, remote, msg)
.await
{
Err(ProtocolError::False) | Err(ProtocolError::RepoAlreadyOpened) => {
Err(NgError::ServerError(ServerError::False))
| Err(NgError::ServerError(ServerError::RepoAlreadyOpened)) => {
// pinning the repo on the server broker
let pin_req;
{
let repo = self.repos.get(&repo_id).ok_or(NgError::RepoNotFound)?;
pin_req = PinRepo::from_repo(repo, remote);
}
if let Ok(SoS::Single(opened)) = broker
match broker
.request::<PinRepo, RepoOpened>(user, remote, pin_req)
.await
{
self.repo_was_opened(&repo_id, &opened)?;
//TODO: check that in the returned opened_repo, the branch we are interested in has effectively been subscribed as publisher by the broker.
} else {
return Err(NgError::ProtocolError);
Ok(SoS::Single(opened)) => {
//log_info!("OPENED {:?}", opened);
self.repo_was_opened(&repo_id, &opened)?;
//TODO: check that in the returned opened_repo, the branch we are interested in has effectively been subscribed as publisher by the broker.
}
Ok(_) => return Err(NgError::InvalidResponse),
Err(e) => return Err(e),
}
}
Err(_) => return Err(NgError::ProtocolError),
Err(e) => return Err(e),
Ok(SoS::Single(pin_status)) => {
// checking that the branch is subscribed as publisher
@ -520,24 +561,27 @@ impl Verifier {
.request::<TopicSub, TopicSubRes>(user, remote, topic_sub)
.await
{
Ok(_) => {
Ok(SoS::Single(sub)) => {
// TODO, deal with heads
// update Repo locally
let repo =
self.repos.get_mut(&repo_id).ok_or(NgError::RepoNotFound)?;
repo.opened_branches.insert(*event.topic_id(), true);
Self::branch_was_opened(&self.topics, repo, &sub)?;
}
Err(_) => {
return Err(NgError::BrokerError);
Ok(_) => return Err(NgError::InvalidResponse),
Err(e) => {
return Err(e);
}
}
}
}
_ => return Err(NgError::ActorError),
_ => return Err(NgError::InvalidResponse),
}
// TODO: deal with received known_heads.
// DO a TopicSync
}
let _ = broker
.request::<PublishEvent, ()>(user, remote, PublishEvent::new(event, overlay))
.await?;
Ok(())
}
@ -669,6 +713,7 @@ impl Verifier {
if verif.config.config_type.should_load_last_seq_num() {
verif.take_some_peer_last_seq_numbers(0)?;
verif.last_seq_num = verif.max_reserved_seq_num;
verif.last_reservation = SystemTime::UNIX_EPOCH;
}
Ok(verif)
}
@ -699,7 +744,7 @@ impl Verifier {
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
let repo_ref = self.add_repo_(repo);
let repo_ref: &Repo = self.add_repo_(repo);
// save in user_storage
if user_storage.is_some() {
let _ = user_storage.unwrap().save_repo(repo_ref);
@ -709,20 +754,35 @@ impl Verifier {
fn add_repo_(&mut self, repo: Repo) -> &Repo {
for (branch_id, info) in repo.branches.iter() {
let overlay_id = repo.store.overlay_id.clone();
//log_info!("LOADING BRANCH: {}", branch_id);
let overlay_id: OverlayId = repo.store.inner_overlay();
let topic_id = info.topic.clone();
//log_info!("LOADING TOPIC: {} {}", overlay_id, topic_id);
let repo_id = repo.id.clone();
let branch_id = branch_id.clone();
assert_eq!(
self.topics
.insert((overlay_id, topic_id), (repo_id, branch_id)),
None
);
let res = self
.topics
.insert((overlay_id, topic_id), (repo_id, branch_id));
assert_eq!(res, None);
}
let repo_ref = self.repos.entry(repo.id).or_insert(repo);
repo_ref
}
fn branch_was_opened(
topics: &HashMap<(OverlayId, PubKey), (PubKey, PubKey)>,
repo: &mut Repo,
sub: &TopicSubRes,
) -> Result<(), NgError> {
let overlay = repo.store.inner_overlay();
//log_info!("branch_was_opened searching for topic {}", sub.topic_id());
let (_, branch_id) = topics
.get(&(overlay, *sub.topic_id()))
.ok_or(NgError::TopicNotFound)?;
repo.opened_branches.insert(*branch_id, sub.is_publisher());
Ok(())
}
fn repo_was_opened(
&mut self,
repo_id: &RepoId,
@ -730,8 +790,7 @@ impl Verifier {
) -> Result<(), NgError> {
let repo = self.repos.get_mut(repo_id).ok_or(NgError::RepoNotFound)?;
for sub in opened_repo {
repo.opened_branches
.insert(*sub.topic_id(), sub.is_publisher());
Self::branch_was_opened(&self.topics, repo, sub)?;
}
Ok(())
}
@ -740,6 +799,7 @@ impl Verifier {
&'a mut self,
creator: &UserId,
creator_priv_key: &PrivKey,
priv_key: PrivKey,
store_repo: &StoreRepo,
private: bool,
) -> Result<&'a Repo, NgError> {
@ -767,13 +827,17 @@ impl Verifier {
);
Arc::new(store)
});
let (repo, proto_events) = Arc::clone(store).create_repo_default(
let (repo, proto_events) = Arc::clone(store).create_repo_with_keys(
creator,
creator_priv_key,
priv_key,
store_repo.repo_id().clone(),
repo_write_cap_secret,
true,
private,
)?;
self.new_events_with_repo(proto_events, &repo).await?;
let repo = self.complete_site_store(store_repo, repo)?;
self.new_events_with_repo(proto_events, &repo).await?;
let repo_ref = self.add_repo_and_save(repo);
Ok(repo_ref)
}
@ -787,8 +851,13 @@ impl Verifier {
) -> Result<&'a Repo, NgError> {
let store = self.get_store_or_load(store_repo);
let repo_write_cap_secret = SymKey::random();
let (repo, proto_events) =
store.create_repo_default(creator, creator_priv_key, repo_write_cap_secret)?;
let (repo, proto_events) = store.create_repo_default(
creator,
creator_priv_key,
repo_write_cap_secret,
false,
false,
)?;
self.new_events_with_repo(proto_events, &repo).await?;
let repo_ref = self.add_repo_and_save(repo);
Ok(repo_ref)

@ -28,8 +28,8 @@ use zeroize::Zeroize;
use ng_client_ws::remote_ws::ConnectionWebSocket;
use ng_net::broker::BROKER;
use ng_net::errors::*;
use ng_net::types::*;
use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::types::*;

@ -1,587 +0,0 @@
fn block_size() -> usize {
store_max_value_size()
//store_valid_value_size(0)
}
async fn test_sync(cnx: &mut impl BrokerConnection, user_pub_key: PubKey, userpriv_key: PrivKey) {
fn add_obj(
content: ObjectContent,
deps: Vec<ObjectId>,
expiry: Option<Timestamp>,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl BlockStorage,
) -> ObjectRef {
let max_object_size = 4000;
let obj = Object::new(
content,
deps,
expiry,
max_object_size,
repo_pubkey,
repo_secret,
);
//log_debug!(">>> add_obj");
log_debug!(" id: {}", obj.id());
//log_debug!(" deps: {:?}", obj.deps());
obj.save(store).unwrap();
obj.reference().unwrap()
}
fn add_commit(
branch: ObjectRef,
author_privkey: PrivKey,
author_pubkey: PubKey,
seq: u32,
deps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
body_ref: ObjectRef,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl BlockStorage,
) -> ObjectRef {
let mut obj_deps: Vec<ObjectId> = vec![];
obj_deps.extend(deps.iter().map(|r| r.id));
obj_deps.extend(acks.iter().map(|r| r.id));
let obj_ref = ObjectRef {
id: ObjectId::Blake3Digest32([1; 32]),
key: SymKey::ChaCha20Key([2; 32]),
};
let refs = vec![obj_ref];
let metadata = vec![5u8; 55];
let expiry = None;
let commit = Commit::new(
author_privkey,
author_pubkey,
seq,
branch,
deps,
acks,
refs,
metadata,
body_ref,
expiry,
)
.unwrap();
//log_debug!("commit: {}", commit.id().unwrap());
add_obj(
ObjectContent::Commit(commit),
obj_deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
fn add_body_branch(
branch: Branch,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl BlockStorage,
) -> ObjectRef {
let deps = vec![];
let expiry = None;
let body = CommitBody::Branch(branch);
//log_debug!("body: {:?}", body);
add_obj(
ObjectContent::CommitBody(body),
deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
fn add_body_trans(
deps: Vec<ObjectId>,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl BlockStorage,
) -> ObjectRef {
let expiry = None;
let content = [7u8; 777].to_vec();
let body = CommitBody::Transaction(Transaction::V0(content));
//log_debug!("body: {:?}", body);
add_obj(
ObjectContent::CommitBody(body),
deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
fn add_body_ack(
deps: Vec<ObjectId>,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl BlockStorage,
) -> ObjectRef {
let expiry = None;
let body = CommitBody::Ack(Ack::V0());
//log_debug!("body: {:?}", body);
add_obj(
ObjectContent::CommitBody(body),
deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
let mut store = HashMapBlockStorage::new();
let mut rng = OsRng {};
// repo
let repo_keypair: Keypair = Keypair::generate(&mut rng);
// log_debug!(
// "repo private key: ({}) {:?}",
// repo_keypair.secret.as_bytes().len(),
// repo_keypair.secret.as_bytes()
// );
// log_debug!(
// "repo public key: ({}) {:?}",
// repo_keypair.public.as_bytes().len(),
// repo_keypair.public.as_bytes()
// );
let _repo_privkey = PrivKey::Ed25519PrivKey(repo_keypair.secret.to_bytes());
let repo_pubkey = PubKey::Ed25519PubKey(repo_keypair.public.to_bytes());
let repo_secret = SymKey::ChaCha20Key([9; 32]);
let repolink = RepoLink::V0(RepoLinkV0 {
id: repo_pubkey,
secret: repo_secret,
peers: vec![],
});
// branch
let branch_keypair: Keypair = Keypair::generate(&mut rng);
//log_debug!("branch public key: {:?}", branch_keypair.public.as_bytes());
let branch_pubkey = PubKey::Ed25519PubKey(branch_keypair.public.to_bytes());
let member_keypair: Keypair = Keypair::generate(&mut rng);
//log_debug!("member public key: {:?}", member_keypair.public.as_bytes());
let member_privkey = PrivKey::Ed25519PrivKey(member_keypair.secret.to_bytes());
let member_pubkey = PubKey::Ed25519PubKey(member_keypair.public.to_bytes());
let metadata = [66u8; 64].to_vec();
let commit_types = vec![CommitType::Ack, CommitType::Transaction];
let secret = SymKey::ChaCha20Key([0; 32]);
let member = MemberV0::new(member_pubkey, commit_types, metadata.clone());
let members = vec![member];
let mut quorum = HashMap::new();
quorum.insert(CommitType::Transaction, 3);
let ack_delay = RelTime::Minutes(3);
let tags = [99u8; 32].to_vec();
let branch = Branch::new(
branch_pubkey,
branch_pubkey,
secret,
members,
quorum,
ack_delay,
tags,
metadata,
);
//log_debug!("branch: {:?}", branch);
log_debug!("branch deps/acks:");
log_debug!("");
log_debug!(" br");
log_debug!(" / \\");
log_debug!(" t1 t2");
log_debug!(" / \\ / \\");
log_debug!(" a3 t4<--t5-->(t1)");
log_debug!(" / \\");
log_debug!(" a6 a7");
log_debug!("");
// commit bodies
let branch_body = add_body_branch(
branch.clone(),
repo_pubkey.clone(),
repo_secret.clone(),
&mut store,
);
let ack_body = add_body_ack(vec![], repo_pubkey, repo_secret, &mut store);
let trans_body = add_body_trans(vec![], repo_pubkey, repo_secret, &mut store);
// create & add commits to store
log_debug!(">> br");
let br = add_commit(
branch_body,
member_privkey,
member_pubkey,
0,
vec![],
vec![],
branch_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t1");
let t1 = add_commit(
branch_body,
member_privkey,
member_pubkey,
1,
vec![br],
vec![],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t2");
let t2 = add_commit(
branch_body,
member_privkey,
member_pubkey,
2,
vec![br],
vec![],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> a3");
let a3 = add_commit(
branch_body,
member_privkey,
member_pubkey,
3,
vec![t1],
vec![],
ack_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t4");
let t4 = add_commit(
branch_body,
member_privkey,
member_pubkey,
4,
vec![t2],
vec![t1],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t5");
let t5 = add_commit(
branch_body,
member_privkey,
member_pubkey,
5,
vec![t1, t2],
vec![t4],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> a6");
let a6 = add_commit(
branch_body,
member_privkey,
member_pubkey,
6,
vec![t4],
vec![],
ack_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> a7");
let a7 = add_commit(
branch_body,
member_privkey,
member_pubkey,
7,
vec![t4],
vec![],
ack_body,
repo_pubkey,
repo_secret,
&mut store,
);
let mut public_overlay_cnx = cnx
.overlay_connect(&repolink, true)
.await
.expect("overlay_connect failed");
// Sending everything to the broker
for (v) in store.get_all() {
//log_debug!("SENDING {}", k);
let _ = public_overlay_cnx
.put_block(&v)
.await
.expect("put_block failed");
}
// Now emptying the local store of the client, and adding only 1 commit into it (br)
// we also have received an commit (t5) but we don't know what to do with it...
let mut store = HashMapBlockStorage::new();
let br = add_commit(
branch_body,
member_privkey,
member_pubkey,
0,
vec![],
vec![],
branch_body,
repo_pubkey,
repo_secret,
&mut store,
);
let t5 = add_commit(
branch_body,
member_privkey,
member_pubkey,
5,
vec![t1, t2],
vec![t4],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!("LOCAL STORE HAS {} BLOCKS", store.get_len());
// Let's pretend that we know that the head of the branch in the broker is at commits a6 and a7.
// normally it would be the pub/sub that notifies us of those heads.
// now we want to synchronize with the broker.
let mut filter = Filter::new(FilterBuilder::new(10, 0.01));
for commit_ref in [br, t5] {
match commit_ref.id {
ObjectId::Blake3Digest32(d) => filter.add(&d),
}
}
let cfg = filter.config();
let known_commits = BloomFilter {
k: cfg.hashes,
f: filter.get_u8_array().to_vec(),
};
let known_heads = [br.id];
let remote_heads = [a6.id, a7.id];
let mut synced_blocks_stream = public_overlay_cnx
.sync_branch(remote_heads.to_vec(), known_heads.to_vec(), known_commits)
.await
.expect("sync_branch failed");
let mut i = 0;
while let Some(b) = synced_blocks_stream.next().await {
log_debug!("GOT BLOCK {}", b.id());
store.put(&b);
i += 1;
}
log_debug!("SYNCED {} BLOCKS", i);
log_debug!("LOCAL STORE HAS {} BLOCKS", store.get_len());
// now the client can verify the DAG and each commit. Then update its list of heads.
}
async fn test(
cnx: &mut impl BrokerConnection,
pub_key: PubKey,
priv_key: PrivKey,
) -> Result<(), ProtocolError> {
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key)
.await?;
cnx.add_user(pub_key, priv_key).await?;
//.expect("add_user 2 (myself) failed");
assert_eq!(
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key)
.await
.err()
.unwrap(),
ProtocolError::UserAlreadyExists
);
let repo = RepoLink::V0(RepoLinkV0 {
id: PubKey::Ed25519PubKey([1; 32]),
secret: SymKey::ChaCha20Key([0; 32]),
peers: vec![],
});
let mut public_overlay_cnx = cnx.overlay_connect(&repo, true).await?;
log_debug!("put_block");
let my_block_id = public_overlay_cnx
.put_block(&Block::new(
vec![],
ObjectDeps::ObjectIdList(vec![]),
None,
vec![27; 150],
None,
))
.await?;
log_debug!("added block_id to store {}", my_block_id);
let object_id = public_overlay_cnx
.put_object(
ObjectContent::File(File::V0(FileV0 {
content_type: vec![],
metadata: vec![],
content: vec![48; 69000],
})),
vec![],
None,
block_size(),
repo.id(),
repo.secret(),
)
.await?;
log_debug!("added object_id to store {}", object_id);
let mut my_block_stream = public_overlay_cnx
.get_block(my_block_id, true, None)
.await?;
//.expect("get_block failed");
while let Some(b) = my_block_stream.next().await {
log_debug!("GOT BLOCK {}", b.id());
}
let mut my_object_stream = public_overlay_cnx.get_block(object_id, true, None).await?;
//.expect("get_block for object failed");
while let Some(b) = my_object_stream.next().await {
log_debug!("GOT BLOCK {}", b.id());
}
let object = public_overlay_cnx.get_object(object_id, None).await?;
//.expect("get_object failed");
log_debug!("GOT OBJECT with ID {}", object.id());
// let object_id = public_overlay_cnx
// .copy_object(object_id, Some(now_timestamp() + 60))
// .await
// .expect("copy_object failed");
// log_debug!("COPIED OBJECT to OBJECT ID {}", object_id);
public_overlay_cnx.delete_object(object_id).await?;
//.expect("delete_object failed");
let res = public_overlay_cnx
.get_object(object_id, None)
.await
.unwrap_err();
log_debug!("result from get object after delete: {}", res);
assert_eq!(res, ProtocolError::NotFound);
//TODO test pin/unpin
// TEST BRANCH SYNC
test_sync(cnx, pub_key, priv_key).await;
Ok(())
}
async fn test_local_connection() {
log_debug!("===== TESTING LOCAL API =====");
let root = tempfile::Builder::new().prefix("ngcli").tempdir().unwrap();
let master_key: [u8; 32] = [0; 32];
std::fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let store = LmdbKCVStorage::open(root.path(), master_key);
//let mut server = BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
let (priv_key, pub_key) = generate_keypair();
// let mut cnx = server.local_connection(pub_key);
// test(&mut cnx, pub_key, priv_key).await;
}
async fn test_remote_connection(url: &str) {
log_debug!("===== TESTING REMOTE API =====");
let (priv_key, pub_key) = generate_keypair();
// open cnx
// test(&mut cnx, pub_key, priv_key).await;
}
#[cfg(test)]
mod test {
use crate::{test_local_connection, test_remote_connection};
#[async_std::test]
pub async fn test_local_cnx() {}
use async_std::task;
use ng_broker::server_ws::*;
use ng_net::utils::gen_dh_keys;
use ng_net::WS_PORT;
use ng_repo::log::*;
use ng_repo::types::PubKey;
#[async_std::test]
pub async fn test_remote_cnx() -> Result<(), Box<dyn std::error::Error>> {
let keys = gen_dh_keys();
// log_debug!("Public key of node: {:?}", keys.1);
// log_debug!("Private key of node: {:?}", keys.0.as_slice());
log_debug!("Public key of node: {}", keys.1);
log_debug!("Private key of node: {}", keys.0);
let thr = task::spawn(run_server_accept_one("127.0.0.1", WS_PORT, keys.0, pubkey));
// time for the server to start
std::thread::sleep(std::time::Duration::from_secs(2));
test_remote_connection("ws://127.0.0.1:3012");
thr.await;
Ok(())
}
}
Loading…
Cancel
Save