add_user, list_users from CLI

pull/19/head
Niko PLP 1 year ago
parent 0c98b2bdc6
commit b9764e7983
  1. 45
      Cargo.lock
  2. 20
      ng-app/src-tauri/src/lib.rs
  3. 28
      ng-sdk-js/src/lib.rs
  4. 42
      ng-wallet/src/lib.rs
  5. 2
      ngaccount/src/main.rs
  6. 16
      ngcli/Cargo.toml
  7. 955
      ngcli/src/main.rs
  8. 587
      ngcli/src/old.rs
  9. 4
      ngd/src/cli.rs
  10. 3
      ngd/src/main.rs
  11. 19
      p2p-broker/src/broker_store/account.rs
  12. 11
      p2p-broker/src/server_ws.rs
  13. 19
      p2p-broker/src/storage.rs
  14. 10
      p2p-broker/src/types.rs
  15. 1
      p2p-client-ws/Cargo.toml
  16. 38
      p2p-client-ws/src/lib.rs
  17. 31
      p2p-client-ws/src/remote_ws.rs
  18. 25
      p2p-client-ws/src/remote_ws_wasm.rs
  19. 9
      p2p-net/src/actor.rs
  20. 109
      p2p-net/src/actors/add_user.rs
  21. 86
      p2p-net/src/actors/del_user.rs
  22. 91
      p2p-net/src/actors/list_users.rs
  23. 9
      p2p-net/src/actors/mod.rs
  24. 18
      p2p-net/src/actors/start.rs
  25. 141
      p2p-net/src/broker.rs
  26. 10
      p2p-net/src/broker_storage.rs
  27. 280
      p2p-net/src/connection.rs
  28. 184
      p2p-net/src/errors.rs
  29. 673
      p2p-net/src/types.rs
  30. 7
      p2p-repo/src/kcv_store.rs
  31. 22
      p2p-repo/src/types.rs
  32. 2
      stores-lmdb/Cargo.toml
  33. 67
      stores-lmdb/src/kcv_store.rs

45
Cargo.lock generated

@ -764,6 +764,7 @@ dependencies = [
"anstyle", "anstyle",
"bitflags", "bitflags",
"clap_lex", "clap_lex",
"once_cell",
"strsim", "strsim",
] ]
@ -2816,18 +2817,28 @@ dependencies = [
name = "ngcli" name = "ngcli"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"assert_cmd", "assert_cmd",
"async-std", "async-std",
"base64-url",
"blake3",
"clap",
"ed25519-dalek", "ed25519-dalek",
"fastbloom-rs", "env_logger",
"futures", "futures",
"p2p-broker", "getrandom 0.2.10",
"log",
"p2p-client-ws", "p2p-client-ws",
"p2p-net", "p2p-net",
"p2p-repo", "p2p-repo",
"rand 0.7.3", "rand 0.7.3",
"serde",
"serde_bare",
"serde_bytes",
"serde_json",
"stores-lmdb", "stores-lmdb",
"tempfile", "tempfile",
"zeroize",
] ]
[[package]] [[package]]
@ -3135,7 +3146,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-test", "wasm-bindgen-test",
"ws_stream_wasm", "ws_stream_wasm",
"xactor",
] ]
[[package]] [[package]]
@ -3854,7 +3864,7 @@ dependencies = [
[[package]] [[package]]
name = "rkv" name = "rkv"
version = "0.18.0" version = "0.18.0"
source = "git+https://git.nextgraph.org/NextGraph/rkv.git?rev=8f5ad79c0c93138b1bdc0a1254a7c6b4d357a5d9#8f5ad79c0c93138b1bdc0a1254a7c6b4d357a5d9" source = "git+https://git.nextgraph.org/NextGraph/rkv.git?rev=c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a#c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a"
dependencies = [ dependencies = [
"arrayref", "arrayref",
"bincode", "bincode",
@ -5922,33 +5932,6 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "xactor"
version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a5297548253bd91993bedcc70ae166e64b3fab5ded1965055b2a450777870ff"
dependencies = [
"anyhow",
"async-std",
"async-trait",
"fnv",
"futures",
"once_cell",
"slab",
"xactor-derive",
]
[[package]]
name = "xactor-derive"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce65a6e78fedf06f36b8e7b3154175226fad605971f535062339c05f5caecbed"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "xorfilter-rs" name = "xorfilter-rs"
version = "0.5.1" version = "0.5.1"

@ -31,13 +31,13 @@ pub type SetupHook = Box<dyn FnOnce(&mut App) -> Result<(), Box<dyn std::error::
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn test() -> Result<(), ()> { async fn test() -> Result<(), ()> {
log_info!("test is {}", BROKER.read().await.test()); log_debug!("test is {}", BROKER.read().await.test());
Ok(()) Ok(())
} }
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn wallet_gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> Result<ShuffledPazzle, ()> { async fn wallet_gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> Result<ShuffledPazzle, ()> {
log_info!( log_debug!(
"wallet_gen_shuffle_for_pazzle_opening from rust {}", "wallet_gen_shuffle_for_pazzle_opening from rust {}",
pazzle_length pazzle_length
); );
@ -46,7 +46,7 @@ async fn wallet_gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> Result<Shuf
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn wallet_gen_shuffle_for_pin() -> Result<Vec<u8>, ()> { async fn wallet_gen_shuffle_for_pin() -> Result<Vec<u8>, ()> {
log_info!("wallet_gen_shuffle_for_pin from rust"); log_debug!("wallet_gen_shuffle_for_pin from rust");
Ok(gen_shuffle_for_pin()) Ok(gen_shuffle_for_pin())
} }
@ -56,13 +56,13 @@ async fn wallet_open_wallet_with_pazzle(
pazzle: Vec<u8>, pazzle: Vec<u8>,
pin: [u8; 4], pin: [u8; 4],
) -> Result<EncryptedWallet, String> { ) -> Result<EncryptedWallet, String> {
log_info!("wallet_open_wallet_with_pazzle from rust {:?}", pazzle); log_debug!("wallet_open_wallet_with_pazzle from rust {:?}", pazzle);
open_wallet_with_pazzle(wallet, pazzle, pin).map_err(|e| e.to_string()) open_wallet_with_pazzle(wallet, pazzle, pin).map_err(|e| e.to_string())
} }
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn wallet_create_wallet(mut params: CreateWalletV0) -> Result<CreateWalletResultV0, String> { async fn wallet_create_wallet(mut params: CreateWalletV0) -> Result<CreateWalletResultV0, String> {
//log_info!("wallet_create_wallet from rust {:?}", params); //log_debug!("wallet_create_wallet from rust {:?}", params);
params.result_with_wallet_file = false; params.result_with_wallet_file = false;
let local_save = params.local_save; let local_save = params.local_save;
let res = create_wallet_v0(params).await.map_err(|e| e.to_string()); let res = create_wallet_v0(params).await.map_err(|e| e.to_string());
@ -78,13 +78,13 @@ async fn wallet_create_wallet(mut params: CreateWalletV0) -> Result<CreateWallet
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn encode_create_account(payload: CreateAccountBSP) -> Result<String, ()> { async fn encode_create_account(payload: CreateAccountBSP) -> Result<String, ()> {
log_info!("{:?}", payload); log_debug!("{:?}", payload);
payload.encode().ok_or(()) payload.encode().ok_or(())
} }
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn doc_sync_branch(nuri: &str, stream_id: &str, app: tauri::AppHandle) -> Result<(), ()> { async fn doc_sync_branch(nuri: &str, stream_id: &str, app: tauri::AppHandle) -> Result<(), ()> {
log_info!("doc_sync_branch {} {}", nuri, stream_id); log_debug!("doc_sync_branch {} {}", nuri, stream_id);
let main_window = app.get_window("main").unwrap(); let main_window = app.get_window("main").unwrap();
let mut reader; let mut reader;
@ -107,7 +107,7 @@ async fn doc_sync_branch(nuri: &str, stream_id: &str, app: tauri::AppHandle) ->
BROKER.write().await.tauri_stream_cancel(stream_id); BROKER.write().await.tauri_stream_cancel(stream_id);
log_info!("END OF LOOP"); log_debug!("END OF LOOP");
Ok(()) Ok(())
} }
@ -118,7 +118,7 @@ async fn doc_sync_branch(nuri: &str, stream_id: &str, app: tauri::AppHandle) ->
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn cancel_doc_sync_branch(stream_id: &str) -> Result<(), ()> { async fn cancel_doc_sync_branch(stream_id: &str) -> Result<(), ()> {
log_info!("cancel stream {}", stream_id); log_debug!("cancel stream {}", stream_id);
BROKER BROKER
.write() .write()
.await .await
@ -131,7 +131,7 @@ async fn doc_get_file_from_store_with_object_ref(
nuri: &str, nuri: &str,
obj_ref: ObjectRef, obj_ref: ObjectRef,
) -> Result<ObjectContent, String> { ) -> Result<ObjectContent, String> {
log_info!( log_debug!(
"doc_get_file_from_store_with_object_ref {} {:?}", "doc_get_file_from_store_with_object_ref {} {:?}",
nuri, nuri,
obj_ref obj_ref

@ -148,11 +148,11 @@ pub fn client_info() -> ClientInfoV0 {
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub fn encode_create_account(payload: JsValue) -> JsValue { pub fn encode_create_account(payload: JsValue) -> JsValue {
log_info!("{:?}", payload); log_debug!("{:?}", payload);
let create_account = serde_wasm_bindgen::from_value::<CreateAccountBSP>(payload).unwrap(); let create_account = serde_wasm_bindgen::from_value::<CreateAccountBSP>(payload).unwrap();
log_info!("create_account {:?}", create_account); log_debug!("create_account {:?}", create_account);
let res = create_account.encode(); let res = create_account.encode();
log_info!("res {:?}", res); log_debug!("res {:?}", res);
serde_wasm_bindgen::to_value(&res).unwrap() serde_wasm_bindgen::to_value(&res).unwrap()
} }
@ -182,7 +182,7 @@ pub fn client_info() -> ClientInfoV0 {
let ua = client_details(); let ua = client_details();
let bowser = Bowser::parse(ua); let bowser = Bowser::parse(ua);
//log_info!("{:?}", bowser); //log_debug!("{:?}", bowser);
let details_string = client_details2(bowser, env!("CARGO_PKG_VERSION").to_string()); let details_string = client_details2(bowser, env!("CARGO_PKG_VERSION").to_string());
@ -200,9 +200,9 @@ pub fn client_info() -> ClientInfoV0 {
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub async fn test() { pub async fn test() {
log_info!("test is {}", BROKER.read().await.test()); log_debug!("test is {}", BROKER.read().await.test());
let client_info = client_info(); let client_info = client_info();
log_info!("{:?}", client_info); log_debug!("{:?}", client_info);
} }
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
@ -213,7 +213,7 @@ pub async fn doc_get_file_from_store_with_object_ref(
) -> Result<JsValue, JsValue> { ) -> Result<JsValue, JsValue> {
let obj_ref = serde_wasm_bindgen::from_value::<ObjectRef>(obj_ref_js).unwrap(); let obj_ref = serde_wasm_bindgen::from_value::<ObjectRef>(obj_ref_js).unwrap();
log_info!( log_debug!(
"doc_get_file {} {:?} {}", "doc_get_file {} {:?} {}",
nuri, nuri,
obj_ref.id, obj_ref.id,
@ -271,14 +271,14 @@ pub async fn doc_sync_branch(anuri: String, callback: &js_sys::Function) -> JsVa
Err(_) => {} Err(_) => {}
} }
} }
log_info!("END OF LOOP"); log_debug!("END OF LOOP");
Ok(()) Ok(())
} }
spawn_and_log_error(inner_task(reader, anuri, callback.clone())); spawn_and_log_error(inner_task(reader, anuri, callback.clone()));
let cb = Closure::once(move || { let cb = Closure::once(move || {
log_info!("close channel"); log_debug!("close channel");
sender.close_channel() sender.close_channel()
}); });
//Closure::wrap(Box::new(move |sender| sender.close_channel()) as Box<FnMut(Sender<Commit>)>); //Closure::wrap(Box::new(move |sender| sender.close_channel()) as Box<FnMut(Sender<Commit>)>);
@ -299,7 +299,7 @@ pub async fn probe() {
WS_PORT, WS_PORT,
) )
.await; .await;
log_info!("broker.probe : {:?}", res); log_debug!("broker.probe : {:?}", res);
Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await; Broker::join_shutdown_with_timeout(std::time::Duration::from_secs(5)).await;
} }
@ -315,12 +315,12 @@ pub async fn start() {
//let pub_key = PubKey::Ed25519PubKey(keys.1); //let pub_key = PubKey::Ed25519PubKey(keys.1);
let keys = generate_keypair(); let keys = generate_keypair();
let x_from_ed = keys.1.to_dh_from_ed(); let x_from_ed = keys.1.to_dh_from_ed();
log_info!("Pub from X {}", x_from_ed); log_debug!("Pub from X {}", x_from_ed);
let (client_priv, client) = generate_keypair(); let (client_priv, client) = generate_keypair();
let (user_priv, user) = generate_keypair(); let (user_priv, user) = generate_keypair();
log_info!("start connecting"); log_debug!("start connecting");
let res = BROKER let res = BROKER
.write() .write()
@ -340,7 +340,7 @@ pub async fn start() {
}), }),
) )
.await; .await;
log_info!("broker.connect : {:?}", res); log_debug!("broker.connect : {:?}", res);
if res.is_err() { if res.is_err() {
return Ok(()); return Ok(());
//panic!("Cannot connect"); //panic!("Cannot connect");
@ -352,7 +352,7 @@ pub async fn start() {
async fn timer_close(remote_peer_id: DirectPeerId, user: Option<PubKey>) -> ResultSend<()> { async fn timer_close(remote_peer_id: DirectPeerId, user: Option<PubKey>) -> ResultSend<()> {
async move { async move {
sleep!(std::time::Duration::from_secs(3)); sleep!(std::time::Duration::from_secs(3));
log_info!("timeout"); log_debug!("timeout");
BROKER BROKER
.write() .write()
.await .await

@ -213,7 +213,7 @@ pub fn open_wallet_with_pazzle(
)?; )?;
pazzle_key.zeroize(); pazzle_key.zeroize();
log_info!( log_debug!(
"opening of wallet with pazzle took: {} ms", "opening of wallet with pazzle took: {} ms",
opening_pazzle.elapsed().as_millis() opening_pazzle.elapsed().as_millis()
); );
@ -298,16 +298,16 @@ pub fn display_pazzle(pazzle: &Vec<u8>) -> Vec<String> {
pub fn gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> ShuffledPazzle { pub fn gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> ShuffledPazzle {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut category_indices: Vec<u8> = (0..pazzle_length).collect(); let mut category_indices: Vec<u8> = (0..pazzle_length).collect();
//log_info!("{:?}", category_indices); //log_debug!("{:?}", category_indices);
category_indices.shuffle(&mut rng); category_indices.shuffle(&mut rng);
//log_info!("{:?}", category_indices); //log_debug!("{:?}", category_indices);
let mut emoji_indices: Vec<Vec<u8>> = Vec::with_capacity(pazzle_length.into()); let mut emoji_indices: Vec<Vec<u8>> = Vec::with_capacity(pazzle_length.into());
for _ in 0..pazzle_length { for _ in 0..pazzle_length {
let mut idx: Vec<u8> = (0..15).collect(); let mut idx: Vec<u8> = (0..15).collect();
//log_info!("{:?}", idx); //log_debug!("{:?}", idx);
idx.shuffle(&mut rng); idx.shuffle(&mut rng);
//log_info!("{:?}", idx); //log_debug!("{:?}", idx);
emoji_indices.push(idx) emoji_indices.push(idx)
} }
ShuffledPazzle { ShuffledPazzle {
@ -319,9 +319,9 @@ pub fn gen_shuffle_for_pazzle_opening(pazzle_length: u8) -> ShuffledPazzle {
pub fn gen_shuffle_for_pin() -> Vec<u8> { pub fn gen_shuffle_for_pin() -> Vec<u8> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let mut digits: Vec<u8> = (0..10).collect(); let mut digits: Vec<u8> = (0..10).collect();
//log_info!("{:?}", digits); //log_debug!("{:?}", digits);
digits.shuffle(&mut rng); digits.shuffle(&mut rng);
//log_info!("{:?}", digits); //log_debug!("{:?}", digits);
digits digits
} }
@ -335,7 +335,7 @@ pub fn gen_shuffle_for_pin() -> Vec<u8> {
// for i in &mut mnemonic { // for i in &mut mnemonic {
// *i = choices.chars().nth(ran.gen_range(0, 72)).unwrap(); // *i = choices.chars().nth(ran.gen_range(0, 72)).unwrap();
// } // }
// log_info!("{}", mnemonic.iter().collect::<String>()); // log_debug!("{}", mnemonic.iter().collect::<String>());
// } // }
/// creates a Wallet from a pin, a security text and image (with option to send the bootstrap and wallet to nextgraph.one) /// creates a Wallet from a pin, a security text and image (with option to send the bootstrap and wallet to nextgraph.one)
@ -559,7 +559,7 @@ pub async fn create_wallet_v0(
// sig, // sig,
// }); // });
log_info!( log_debug!(
"creating of wallet took: {} ms", "creating of wallet took: {} ms",
creating_pazzle.elapsed().as_millis() creating_pazzle.elapsed().as_millis()
); );
@ -600,11 +600,11 @@ mod test {
#[test] #[test]
fn test_gen_shuffle() { fn test_gen_shuffle() {
let shuffle = gen_shuffle_for_pazzle_opening(9); let shuffle = gen_shuffle_for_pazzle_opening(9);
log_info!("{:?}", shuffle); log_debug!("{:?}", shuffle);
let shuffle = gen_shuffle_for_pazzle_opening(12); let shuffle = gen_shuffle_for_pazzle_opening(12);
log_info!("{:?}", shuffle); log_debug!("{:?}", shuffle);
let shuffle = gen_shuffle_for_pazzle_opening(15); let shuffle = gen_shuffle_for_pazzle_opening(15);
log_info!("{:?}", shuffle); log_debug!("{:?}", shuffle);
let digits = gen_shuffle_for_pin(); let digits = gen_shuffle_for_pin();
let digits = gen_shuffle_for_pin(); let digits = gen_shuffle_for_pin();
} }
@ -636,26 +636,26 @@ mod test {
.await .await
.expect("create_wallet_v0"); .expect("create_wallet_v0");
log_info!( log_debug!(
"creation of wallet took: {} ms", "creation of wallet took: {} ms",
creation.elapsed().as_millis() creation.elapsed().as_millis()
); );
log_info!("-----------------------------"); log_debug!("-----------------------------");
let mut file = File::create("tests/wallet.ngw").expect("open wallet write file"); let mut file = File::create("tests/wallet.ngw").expect("open wallet write file");
let ser_wallet = to_vec(&NgFile::V0(NgFileV0::Wallet(res.wallet.clone()))).unwrap(); let ser_wallet = to_vec(&NgFile::V0(NgFileV0::Wallet(res.wallet.clone()))).unwrap();
file.write_all(&ser_wallet); file.write_all(&ser_wallet);
log_info!( log_debug!(
"wallet id: {:?}", "wallet id: {:?}",
base64_url::encode(&res.wallet.id().slice()) base64_url::encode(&res.wallet.id().slice())
); );
log_info!("pazzle {:?}", display_pazzle(&res.pazzle)); log_debug!("pazzle {:?}", display_pazzle(&res.pazzle));
log_info!("mnemonic {:?}", display_mnemonic(&res.mnemonic)); log_debug!("mnemonic {:?}", display_mnemonic(&res.mnemonic));
log_info!("pin {:?}", pin); log_debug!("pin {:?}", pin);
if let Wallet::V0(v0) = &res.wallet { if let Wallet::V0(v0) = &res.wallet {
log_info!("security text: {:?}", v0.content.security_txt); log_debug!("security text: {:?}", v0.content.security_txt);
let mut file = let mut file =
File::create("tests/generated_security_image.jpg").expect("open write file"); File::create("tests/generated_security_image.jpg").expect("open write file");
@ -678,7 +678,7 @@ mod test {
.expect("open with mnemonic"); .expect("open with mnemonic");
//log_debug!("encrypted part {:?}", w); //log_debug!("encrypted part {:?}", w);
log_info!( log_debug!(
"opening of wallet with mnemonic took: {} ms", "opening of wallet with mnemonic took: {} ms",
opening_mnemonic.elapsed().as_millis() opening_mnemonic.elapsed().as_millis()
); );
@ -687,7 +687,7 @@ mod test {
let opening_pazzle = Instant::now(); let opening_pazzle = Instant::now();
let w = open_wallet_with_pazzle(Wallet::V0(v0.clone()), res.pazzle.clone(), pin) let w = open_wallet_with_pazzle(Wallet::V0(v0.clone()), res.pazzle.clone(), pin)
.expect("open with pazzle"); .expect("open with pazzle");
log_info!( log_debug!(
"opening of wallet with pazzle took: {} ms", "opening of wallet with pazzle took: {} ms",
opening_pazzle.elapsed().as_millis() opening_pazzle.elapsed().as_millis()
); );

@ -75,7 +75,7 @@ async fn main() -> anyhow::Result<()> {
let addr: Vec<&str> = server_address.split(',').collect(); let addr: Vec<&str> = server_address.split(',').collect();
if addr.len() != 3 { if addr.len() != 3 {
return Err(anyhow!( return Err(anyhow!(
"NG_ACCOUNT_SERVER is invalid. format is IP,PORT,PEERID" "NG_ACCOUNT_SERVER is invalid. format is IP,PORT,PEER_ID"
)); ));
} }
let ip: IP = addr[0].into(); let ip: IP = addr[0].into();

@ -8,15 +8,25 @@ description = "CLI command-line interpreter of NextGraph"
repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs" repository = "https://git.nextgraph.org/NextGraph/nextgraph-rs"
[dependencies] [dependencies]
p2p-repo = { path = "../p2p-repo" } p2p-repo = { path = "../p2p-repo", features = ["server_log_output"] }
p2p-net = { path = "../p2p-net" } p2p-net = { path = "../p2p-net" }
p2p-client-ws = { path = "../p2p-client-ws" } p2p-client-ws = { path = "../p2p-client-ws" }
p2p-broker = { path = "../p2p-broker" }
stores-lmdb = { path = "../stores-lmdb" } stores-lmdb = { path = "../stores-lmdb" }
async-std = { version = "1.12.0", features = ["attributes"] } async-std = { version = "1.12.0", features = ["attributes"] }
futures = "0.3.24" futures = "0.3.24"
tempfile = "3" tempfile = "3"
fastbloom-rs = "0.5.3"
rand = "0.7" rand = "0.7"
ed25519-dalek = "1.0.1" ed25519-dalek = "1.0.1"
assert_cmd = "2.0.5" assert_cmd = "2.0.5"
clap = { version = "4.3.4", features = ["env","string","cargo"] }
log = "0.4"
env_logger = "0.10"
anyhow = "1.0.71"
serde_json = "1.0"
zeroize = { version = "1.6.0" }
base64-url = "2.0.0"
getrandom = "0.2.7"
blake3 = "1.3.1"
serde = { version = "1.0", features = ["derive"] }
serde_bare = "0.5.0"
serde_bytes = "0.11.7"

File diff suppressed because it is too large Load Diff

@ -0,0 +1,587 @@
fn block_size() -> usize {
store_max_value_size()
//store_valid_value_size(0)
}
async fn test_sync(cnx: &mut impl BrokerConnection, user_pub_key: PubKey, userpriv_key: PrivKey) {
fn add_obj(
content: ObjectContent,
deps: Vec<ObjectId>,
expiry: Option<Timestamp>,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl RepoStore,
) -> ObjectRef {
let max_object_size = 4000;
let obj = Object::new(
content,
deps,
expiry,
max_object_size,
repo_pubkey,
repo_secret,
);
//log_debug!(">>> add_obj");
log_debug!(" id: {}", obj.id());
//log_debug!(" deps: {:?}", obj.deps());
obj.save(store).unwrap();
obj.reference().unwrap()
}
fn add_commit(
branch: ObjectRef,
author_privkey: PrivKey,
author_pubkey: PubKey,
seq: u32,
deps: Vec<ObjectRef>,
acks: Vec<ObjectRef>,
body_ref: ObjectRef,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl RepoStore,
) -> ObjectRef {
let mut obj_deps: Vec<ObjectId> = vec![];
obj_deps.extend(deps.iter().map(|r| r.id));
obj_deps.extend(acks.iter().map(|r| r.id));
let obj_ref = ObjectRef {
id: ObjectId::Blake3Digest32([1; 32]),
key: SymKey::ChaCha20Key([2; 32]),
};
let refs = vec![obj_ref];
let metadata = vec![5u8; 55];
let expiry = None;
let commit = Commit::new(
author_privkey,
author_pubkey,
seq,
branch,
deps,
acks,
refs,
metadata,
body_ref,
expiry,
)
.unwrap();
//log_debug!("commit: {}", commit.id().unwrap());
add_obj(
ObjectContent::Commit(commit),
obj_deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
fn add_body_branch(
branch: Branch,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl RepoStore,
) -> ObjectRef {
let deps = vec![];
let expiry = None;
let body = CommitBody::Branch(branch);
//log_debug!("body: {:?}", body);
add_obj(
ObjectContent::CommitBody(body),
deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
fn add_body_trans(
deps: Vec<ObjectId>,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl RepoStore,
) -> ObjectRef {
let expiry = None;
let content = [7u8; 777].to_vec();
let body = CommitBody::Transaction(Transaction::V0(content));
//log_debug!("body: {:?}", body);
add_obj(
ObjectContent::CommitBody(body),
deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
fn add_body_ack(
deps: Vec<ObjectId>,
repo_pubkey: PubKey,
repo_secret: SymKey,
store: &mut impl RepoStore,
) -> ObjectRef {
let expiry = None;
let body = CommitBody::Ack(Ack::V0());
//log_debug!("body: {:?}", body);
add_obj(
ObjectContent::CommitBody(body),
deps,
expiry,
repo_pubkey,
repo_secret,
store,
)
}
let mut store = HashMapRepoStore::new();
let mut rng = OsRng {};
// repo
let repo_keypair: Keypair = Keypair::generate(&mut rng);
// log_debug!(
// "repo private key: ({}) {:?}",
// repo_keypair.secret.as_bytes().len(),
// repo_keypair.secret.as_bytes()
// );
// log_debug!(
// "repo public key: ({}) {:?}",
// repo_keypair.public.as_bytes().len(),
// repo_keypair.public.as_bytes()
// );
let _repo_privkey = PrivKey::Ed25519PrivKey(repo_keypair.secret.to_bytes());
let repo_pubkey = PubKey::Ed25519PubKey(repo_keypair.public.to_bytes());
let repo_secret = SymKey::ChaCha20Key([9; 32]);
let repolink = RepoLink::V0(RepoLinkV0 {
id: repo_pubkey,
secret: repo_secret,
peers: vec![],
});
// branch
let branch_keypair: Keypair = Keypair::generate(&mut rng);
//log_debug!("branch public key: {:?}", branch_keypair.public.as_bytes());
let branch_pubkey = PubKey::Ed25519PubKey(branch_keypair.public.to_bytes());
let member_keypair: Keypair = Keypair::generate(&mut rng);
//log_debug!("member public key: {:?}", member_keypair.public.as_bytes());
let member_privkey = PrivKey::Ed25519PrivKey(member_keypair.secret.to_bytes());
let member_pubkey = PubKey::Ed25519PubKey(member_keypair.public.to_bytes());
let metadata = [66u8; 64].to_vec();
let commit_types = vec![CommitType::Ack, CommitType::Transaction];
let secret = SymKey::ChaCha20Key([0; 32]);
let member = MemberV0::new(member_pubkey, commit_types, metadata.clone());
let members = vec![member];
let mut quorum = HashMap::new();
quorum.insert(CommitType::Transaction, 3);
let ack_delay = RelTime::Minutes(3);
let tags = [99u8; 32].to_vec();
let branch = Branch::new(
branch_pubkey,
branch_pubkey,
secret,
members,
quorum,
ack_delay,
tags,
metadata,
);
//log_debug!("branch: {:?}", branch);
log_debug!("branch deps/acks:");
log_debug!("");
log_debug!(" br");
log_debug!(" / \\");
log_debug!(" t1 t2");
log_debug!(" / \\ / \\");
log_debug!(" a3 t4<--t5-->(t1)");
log_debug!(" / \\");
log_debug!(" a6 a7");
log_debug!("");
// commit bodies
let branch_body = add_body_branch(
branch.clone(),
repo_pubkey.clone(),
repo_secret.clone(),
&mut store,
);
let ack_body = add_body_ack(vec![], repo_pubkey, repo_secret, &mut store);
let trans_body = add_body_trans(vec![], repo_pubkey, repo_secret, &mut store);
// create & add commits to store
log_debug!(">> br");
let br = add_commit(
branch_body,
member_privkey,
member_pubkey,
0,
vec![],
vec![],
branch_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t1");
let t1 = add_commit(
branch_body,
member_privkey,
member_pubkey,
1,
vec![br],
vec![],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t2");
let t2 = add_commit(
branch_body,
member_privkey,
member_pubkey,
2,
vec![br],
vec![],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> a3");
let a3 = add_commit(
branch_body,
member_privkey,
member_pubkey,
3,
vec![t1],
vec![],
ack_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t4");
let t4 = add_commit(
branch_body,
member_privkey,
member_pubkey,
4,
vec![t2],
vec![t1],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> t5");
let t5 = add_commit(
branch_body,
member_privkey,
member_pubkey,
5,
vec![t1, t2],
vec![t4],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> a6");
let a6 = add_commit(
branch_body,
member_privkey,
member_pubkey,
6,
vec![t4],
vec![],
ack_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!(">> a7");
let a7 = add_commit(
branch_body,
member_privkey,
member_pubkey,
7,
vec![t4],
vec![],
ack_body,
repo_pubkey,
repo_secret,
&mut store,
);
let mut public_overlay_cnx = cnx
.overlay_connect(&repolink, true)
.await
.expect("overlay_connect failed");
// Sending everything to the broker
for (v) in store.get_all() {
//log_debug!("SENDING {}", k);
let _ = public_overlay_cnx
.put_block(&v)
.await
.expect("put_block failed");
}
// Now emptying the local store of the client, and adding only 1 commit into it (br)
// we also have received an commit (t5) but we don't know what to do with it...
let mut store = HashMapRepoStore::new();
let br = add_commit(
branch_body,
member_privkey,
member_pubkey,
0,
vec![],
vec![],
branch_body,
repo_pubkey,
repo_secret,
&mut store,
);
let t5 = add_commit(
branch_body,
member_privkey,
member_pubkey,
5,
vec![t1, t2],
vec![t4],
trans_body,
repo_pubkey,
repo_secret,
&mut store,
);
log_debug!("LOCAL STORE HAS {} BLOCKS", store.get_len());
// Let's pretend that we know that the head of the branch in the broker is at commits a6 and a7.
// normally it would be the pub/sub that notifies us of those heads.
// now we want to synchronize with the broker.
let mut filter = Filter::new(FilterBuilder::new(10, 0.01));
for commit_ref in [br, t5] {
match commit_ref.id {
ObjectId::Blake3Digest32(d) => filter.add(&d),
}
}
let cfg = filter.config();
let known_commits = BloomFilter {
k: cfg.hashes,
f: filter.get_u8_array().to_vec(),
};
let known_heads = [br.id];
let remote_heads = [a6.id, a7.id];
let mut synced_blocks_stream = public_overlay_cnx
.sync_branch(remote_heads.to_vec(), known_heads.to_vec(), known_commits)
.await
.expect("sync_branch failed");
let mut i = 0;
while let Some(b) = synced_blocks_stream.next().await {
log_debug!("GOT BLOCK {}", b.id());
store.put(&b);
i += 1;
}
log_debug!("SYNCED {} BLOCKS", i);
log_debug!("LOCAL STORE HAS {} BLOCKS", store.get_len());
// now the client can verify the DAG and each commit. Then update its list of heads.
}
async fn test(
cnx: &mut impl BrokerConnection,
pub_key: PubKey,
priv_key: PrivKey,
) -> Result<(), ProtocolError> {
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key)
.await?;
cnx.add_user(pub_key, priv_key).await?;
//.expect("add_user 2 (myself) failed");
assert_eq!(
cnx.add_user(PubKey::Ed25519PubKey([1; 32]), priv_key)
.await
.err()
.unwrap(),
ProtocolError::UserAlreadyExists
);
let repo = RepoLink::V0(RepoLinkV0 {
id: PubKey::Ed25519PubKey([1; 32]),
secret: SymKey::ChaCha20Key([0; 32]),
peers: vec![],
});
let mut public_overlay_cnx = cnx.overlay_connect(&repo, true).await?;
log_debug!("put_block");
let my_block_id = public_overlay_cnx
.put_block(&Block::new(
vec![],
ObjectDeps::ObjectIdList(vec![]),
None,
vec![27; 150],
None,
))
.await?;
log_debug!("added block_id to store {}", my_block_id);
let object_id = public_overlay_cnx
.put_object(
ObjectContent::File(File::V0(FileV0 {
content_type: vec![],
metadata: vec![],
content: vec![48; 69000],
})),
vec![],
None,
block_size(),
repo.id(),
repo.secret(),
)
.await?;
log_debug!("added object_id to store {}", object_id);
let mut my_block_stream = public_overlay_cnx
.get_block(my_block_id, true, None)
.await?;
//.expect("get_block failed");
while let Some(b) = my_block_stream.next().await {
log_debug!("GOT BLOCK {}", b.id());
}
let mut my_object_stream = public_overlay_cnx.get_block(object_id, true, None).await?;
//.expect("get_block for object failed");
while let Some(b) = my_object_stream.next().await {
log_debug!("GOT BLOCK {}", b.id());
}
let object = public_overlay_cnx.get_object(object_id, None).await?;
//.expect("get_object failed");
log_debug!("GOT OBJECT with ID {}", object.id());
// let object_id = public_overlay_cnx
// .copy_object(object_id, Some(now_timestamp() + 60))
// .await
// .expect("copy_object failed");
// log_debug!("COPIED OBJECT to OBJECT ID {}", object_id);
public_overlay_cnx.delete_object(object_id).await?;
//.expect("delete_object failed");
let res = public_overlay_cnx
.get_object(object_id, None)
.await
.unwrap_err();
log_debug!("result from get object after delete: {}", res);
assert_eq!(res, ProtocolError::NotFound);
//TODO test pin/unpin
// TEST BRANCH SYNC
test_sync(cnx, pub_key, priv_key).await;
Ok(())
}
async fn test_local_connection() {
log_debug!("===== TESTING LOCAL API =====");
let root = tempfile::Builder::new().prefix("ngcli").tempdir().unwrap();
let master_key: [u8; 32] = [0; 32];
std::fs::create_dir_all(root.path()).unwrap();
log_debug!("{}", root.path().to_str().unwrap());
let store = LmdbKCVStore::open(root.path(), master_key);
//let mut server = BrokerServer::new(store, ConfigMode::Local).expect("starting broker");
let (priv_key, pub_key) = generate_keypair();
// let mut cnx = server.local_connection(pub_key);
// test(&mut cnx, pub_key, priv_key).await;
}
async fn test_remote_connection(url: &str) {
log_debug!("===== TESTING REMOTE API =====");
let (priv_key, pub_key) = generate_keypair();
// open cnx
// test(&mut cnx, pub_key, priv_key).await;
}
#[cfg(test)]
mod test {
use crate::{test_local_connection, test_remote_connection};
#[async_std::test]
pub async fn test_local_cnx() {}
use async_std::task;
use p2p_broker::server_ws::*;
use p2p_net::utils::gen_dh_keys;
use p2p_net::WS_PORT;
use p2p_repo::log::*;
use p2p_repo::types::PubKey;
#[async_std::test]
pub async fn test_remote_cnx() -> Result<(), Box<dyn std::error::Error>> {
let keys = gen_dh_keys();
// log_debug!("Public key of node: {:?}", keys.1);
// log_debug!("Private key of node: {:?}", keys.0.as_slice());
log_debug!("Public key of node: {}", keys.1);
log_debug!("Private key of node: {}", keys.0);
let thr = task::spawn(run_server_accept_one("127.0.0.1", WS_PORT, keys.0, pubkey));
// time for the server to start
std::thread::sleep(std::time::Duration::from_secs(2));
test_remote_connection("ws://127.0.0.1:3012");
thr.await;
Ok(())
}
}

@ -31,7 +31,7 @@ pub(crate) struct Cli {
#[arg(short, long, env = "NG_SERVER_KEY")] #[arg(short, long, env = "NG_SERVER_KEY")]
pub key: Option<String>, pub key: Option<String>,
/// Saves to disk the provided or automatically generated key. Only used if file storage is secure. Alternatives are passing the key at every start with --key or NG_SERVER_KEY env var. /// Saves to disk the provided or automatically generated key. Only use if file storage is secure. Alternatives are passing the key at every start with --key or NG_SERVER_KEY env var.
#[arg(long)] #[arg(long)]
pub save_key: bool, pub save_key: bool,
@ -47,7 +47,7 @@ pub(crate) struct Cli {
#[arg(long, requires("core"))] #[arg(long, requires("core"))]
pub core_with_clients: bool, pub core_with_clients: bool,
/// Quick config to forward all requests to another BROKER. format is "[DOMAIN/IP:PORT]@PEERID". An IPv6 should be encased in square brackets [IPv6] and the whole option should be between double quotes. Port defaults to 80 for IPs and 443 for domains /// Quick config to forward all requests to another BROKER. format is "[DOMAIN/IP:PORT]@PEER_ID". An IPv6 should be encased in square brackets [IPv6] and the whole option should be between double quotes. Port defaults to 80 for IPs and 443 for domains
#[arg( #[arg(
short, short,
long, long,

@ -450,6 +450,7 @@ async fn main_inner() -> Result<(), ()> {
|| args.public.is_some() || args.public.is_some()
|| args.dynamic.is_some() || args.dynamic.is_some()
|| args.domain.is_some() || args.domain.is_some()
|| args.domain_private.is_some()
{ {
// QUICK CONFIG // QUICK CONFIG
@ -869,7 +870,7 @@ async fn main_inner() -> Result<(), ()> {
return Err(()); return Err(());
} }
let pub_key_array = decode_key(parts[1]) let pub_key_array = decode_key(parts[1])
.map_err(|_| log_err!("The PEERID provided in the --forward option is invalid"))?; .map_err(|_| log_err!("The PEER_ID provided in the --forward option is invalid"))?;
let peer_id = PubKey::Ed25519PubKey(pub_key_array); let peer_id = PubKey::Ed25519PubKey(pub_key_array);
let server_type = if parts[0].len() > 0 { let server_type = if parts[0].len() > 0 {

@ -18,7 +18,7 @@ use p2p_net::types::*;
use p2p_repo::kcv_store::KCVStore; use p2p_repo::kcv_store::KCVStore;
use p2p_repo::store::*; use p2p_repo::store::*;
use p2p_repo::types::Timestamp; use p2p_repo::types::Timestamp;
use serde_bare::to_vec; use serde_bare::{from_slice, to_vec};
pub struct Account<'a> { pub struct Account<'a> {
/// User ID /// User ID
@ -65,7 +65,7 @@ impl<'a> Account<'a> {
store, store,
}; };
if acc.exists() { if acc.exists() {
return Err(StorageError::BackendError); return Err(StorageError::AlreadyExists);
} }
store.put( store.put(
Self::PREFIX, Self::PREFIX,
@ -75,6 +75,21 @@ impl<'a> Account<'a> {
)?; )?;
Ok(acc) Ok(acc)
} }
pub fn get_all_users(
admins: bool,
store: &'a dyn KCVStore,
) -> Result<Vec<UserId>, StorageError> {
let size = to_vec(&UserId::nil())?.len();
let mut res: Vec<UserId> = vec![];
for user in store.get_all_keys_and_values(Self::PREFIX, size, Some(Self::ADMIN))? {
let admin: bool = from_slice(&user.1)?;
if admin == admins {
let id: UserId = from_slice(&user.0[1..user.0.len() - 1])?;
res.push(id);
}
}
Ok(res)
}
pub fn exists(&self) -> bool { pub fn exists(&self) -> bool {
self.store self.store
.get( .get(

@ -595,7 +595,7 @@ pub async fn run_server_accept_one(
let tcp = connections.next().await.unwrap()?; let tcp = connections.next().await.unwrap()?;
{ {
BROKER.write().await.set_my_peer_id(peer_pub_key); //BROKER.write().await.set_my_peer_id(peer_pub_key);
} }
accept(tcp, peer_priv_key).await; accept(tcp, peer_priv_key).await;
@ -779,12 +779,17 @@ pub async fn run_server_v0(
.map_err(|e| log_err!("Error while opening broker storage: {:?}", e))?; .map_err(|e| log_err!("Error while opening broker storage: {:?}", e))?;
let mut broker = BROKER.write().await; let mut broker = BROKER.write().await;
broker.set_my_peer_id(peer_id);
broker.set_storage(broker_storage); broker.set_storage(broker_storage);
LISTENERS_INFO LISTENERS_INFO
.set(broker.set_listeners(listener_infos)) .set(broker.set_listeners(listener_infos))
.unwrap(); .unwrap();
broker.set_overlays_configs(config.overlays_configs); let server_config = ServerConfig {
overlays_configs: config.overlays_configs,
registration: config.registration,
admin_user: config.admin_user,
peer_id,
};
broker.set_server_config(server_config);
} }
// Actually starting the listeners // Actually starting the listeners

@ -11,18 +11,21 @@
use std::path::PathBuf; use std::path::PathBuf;
use crate::broker_store::account::Account;
use crate::broker_store::invitation::Invitation; use crate::broker_store::invitation::Invitation;
use crate::broker_store::wallet::Wallet; use crate::broker_store::wallet::Wallet;
use crate::types::*; use crate::types::*;
use p2p_net::broker_storage::*; use p2p_net::broker_storage::*;
use p2p_net::errors::ProtocolError;
use p2p_net::types::{BootstrapContentV0, InvitationCode, InvitationV0}; use p2p_net::types::{BootstrapContentV0, InvitationCode, InvitationV0};
use p2p_repo::kcv_store::KCVStore; use p2p_repo::kcv_store::KCVStore;
use p2p_repo::log::*; use p2p_repo::log::*;
use p2p_repo::store::StorageError; use p2p_repo::store::StorageError;
use p2p_repo::types::SymKey; use p2p_repo::types::{PubKey, SymKey};
use stores_lmdb::kcv_store::LmdbKCVStore; use stores_lmdb::kcv_store::LmdbKCVStore;
use stores_lmdb::repo_store::LmdbRepoStore; use stores_lmdb::repo_store::LmdbRepoStore;
#[derive(Debug)]
pub struct LmdbBrokerStorage { pub struct LmdbBrokerStorage {
wallet_storage: LmdbKCVStore, wallet_storage: LmdbKCVStore,
accounts_storage: LmdbKCVStore, accounts_storage: LmdbKCVStore,
@ -94,5 +97,17 @@ impl LmdbBrokerStorage {
} }
impl BrokerStorage for LmdbBrokerStorage { impl BrokerStorage for LmdbBrokerStorage {
fn get_user(&self) {} fn get_user(&self, user_id: PubKey) -> Result<bool, ProtocolError> {
log_debug!("get_user {user_id}");
Ok(Account::open(&user_id, &self.accounts_storage)?.is_admin()?)
}
fn add_user(&self, user_id: PubKey, is_admin: bool) -> Result<(), ProtocolError> {
log_debug!("add_user {user_id} is admin {is_admin}");
Account::create(&user_id, is_admin, &self.accounts_storage)?;
Ok(())
}
fn list_users(&self, admins: bool) -> Result<Vec<PubKey>, ProtocolError> {
log_debug!("list_users that are admin == {admins}");
Ok(Account::get_all_users(admins, &self.accounts_storage)?)
}
} }

@ -6,18 +6,10 @@
// at your option. All files in the project carrying such // at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
use p2p_net::types::{BrokerOverlayConfigV0, ListenerV0}; use p2p_net::types::{BrokerOverlayConfigV0, ListenerV0, RegistrationConfig};
use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::types::{PrivKey, PubKey};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Registration config
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RegistrationConfig {
Closed,
Invitation,
Open,
}
/// DaemonConfig Version 0 /// DaemonConfig Version 0
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DaemonConfigV0 { pub struct DaemonConfigV0 {

@ -33,5 +33,4 @@ features = ["js"]
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
getrandom = "0.2.7" getrandom = "0.2.7"
xactor = "0.7.11"
async-tungstenite = { git = "https://git.nextgraph.org/NextGraph/async-tungstenite.git", branch = "nextgraph", features = ["async-std-runtime"] } async-tungstenite = { git = "https://git.nextgraph.org/NextGraph/async-tungstenite.git", branch = "nextgraph", features = ["async-std-runtime"] }

@ -6,44 +6,6 @@
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
#[macro_export]
macro_rules! before {
( $self:expr, $request_id:ident, $addr:ident, $receiver:ident ) => {
let mut actor = BrokerMessageActor::new();
let $receiver = actor.receiver();
let mut $addr = actor
.start()
.await
.map_err(|_e| ProtocolError::ActorError)?;
let $request_id = $addr.actor_id();
//log_debug!("actor ID {}", $request_id);
{
let mut map = $self.actors.write().expect("RwLock poisoned");
map.insert($request_id, $addr.downgrade());
}
};
}
macro_rules! after {
( $self:expr, $request_id:ident, $addr:ident, $receiver:ident, $reply:ident ) => {
//log_debug!("waiting for reply");
$addr.wait_for_stop().await; // TODO add timeout and close connection if there's no reply
let r = $receiver.await;
if r.is_err() {
return Err(ProtocolError::Closing);
}
let $reply = r.unwrap();
//log_debug!("reply arrived {:?}", $reply);
{
let mut map = $self.actors.write().expect("RwLock poisoned");
map.remove(&$request_id);
}
};
}
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub mod remote_ws; pub mod remote_ws;

@ -166,7 +166,7 @@ async fn close_ws(
code: u16, code: u16,
reason: &str, reason: &str,
) -> Result<(), NetError> { ) -> Result<(), NetError> {
log_info!("close_ws {:?}", code); log_debug!("close_ws {:?}", code);
let cmd = if code == 1000 { let cmd = if code == 1000 {
ConnectionCommand::Close ConnectionCommand::Close
@ -177,7 +177,7 @@ async fn close_ws(
} else { } else {
ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap()) ConnectionCommand::Error(NetError::try_from(code - 4949).unwrap())
}; };
log_info!("sending to read loop {:?}", cmd); log_debug!("sending to read loop {:?}", cmd);
let _ = futures::SinkExt::send(receiver, cmd).await; let _ = futures::SinkExt::send(receiver, cmd).await;
stream stream
@ -206,11 +206,11 @@ async fn ws_loop(
select! { select! {
r = stream.next().fuse() => match r { r = stream.next().fuse() => match r {
Some(Ok(msg)) => { Some(Ok(msg)) => {
//log_info!("GOT MESSAGE {:?}", msg); //log_debug!("GOT MESSAGE {:?}", msg);
if msg.is_close() { if msg.is_close() {
if let Message::Close(Some(cf)) = msg { if let Message::Close(Some(cf)) = msg {
log_info!("CLOSE from remote with closeframe: {}",cf.reason); log_debug!("CLOSE from remote with closeframe: {}",cf.reason);
let last_command = match cf.code { let last_command = match cf.code {
CloseCode::Normal => CloseCode::Normal =>
ConnectionCommand::Close, ConnectionCommand::Close,
@ -229,7 +229,7 @@ async fn ws_loop(
} }
else { else {
let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await; let _ = futures::SinkExt::send(receiver, ConnectionCommand::Close).await;
log_info!("CLOSE from remote"); log_debug!("CLOSE from remote");
} }
return Ok(ProtocolError::Closing); return Ok(ProtocolError::Closing);
} else { } else {
@ -237,12 +237,12 @@ async fn ws_loop(
.map_err(|_e| NetError::IoError)?; .map_err(|_e| NetError::IoError)?;
} }
}, },
Some(Err(e)) => {log_info!("GOT ERROR {:?}",e);return Err(NetError::WsError);}, Some(Err(e)) => {log_debug!("GOT ERROR {:?}",e);return Err(NetError::WsError);},
None => break None => break
}, },
s = sender.next().fuse() => match s { s = sender.next().fuse() => match s {
Some(msg) => { Some(msg) => {
//log_info!("SENDING MESSAGE {:?}", msg); //log_debug!("SENDING MESSAGE {:?}", msg);
match msg { match msg {
ConnectionCommand::Msg(m) => { ConnectionCommand::Msg(m) => {
futures::SinkExt::send(&mut stream,Message::binary(serde_bare::to_vec(&m)?)).await.map_err(|_e| NetError::IoError)?; futures::SinkExt::send(&mut stream,Message::binary(serde_bare::to_vec(&m)?)).await.map_err(|_e| NetError::IoError)?;
@ -255,6 +255,9 @@ async fn ws_loop(
}, },
ConnectionCommand::Close => { ConnectionCommand::Close => {
break; break;
},
ConnectionCommand::ReEnter => {
//do nothing. loop
} }
} }
}, },
@ -267,7 +270,7 @@ async fn ws_loop(
match inner_loop(&mut ws, sender, &mut receiver).await { match inner_loop(&mut ws, sender, &mut receiver).await {
Ok(proto_err) => { Ok(proto_err) => {
if proto_err == ProtocolError::Closing { if proto_err == ProtocolError::Closing {
log_info!("ProtocolError::Closing"); log_debug!("ProtocolError::Closing");
let _ = ws.close(None).await; let _ = ws.close(None).await;
} else if proto_err == ProtocolError::NoError { } else if proto_err == ProtocolError::NoError {
close_ws(&mut ws, &mut receiver, 1000, "").await?; close_ws(&mut ws, &mut receiver, 1000, "").await?;
@ -316,12 +319,12 @@ mod test {
let keys = generate_keypair(); let keys = generate_keypair();
let x_from_ed = keys.1.to_dh_from_ed(); let x_from_ed = keys.1.to_dh_from_ed();
log_info!("Pub from X {}", x_from_ed); log_debug!("Pub from X {}", x_from_ed);
let (client_priv, client) = generate_keypair(); let (client_priv, client) = generate_keypair();
let (user_priv, user) = generate_keypair(); let (user_priv, user) = generate_keypair();
log_info!("start connecting"); log_debug!("start connecting");
{ {
let res = BROKER let res = BROKER
.write() .write()
@ -341,7 +344,7 @@ mod test {
}), }),
) )
.await; .await;
log_info!("broker.connect : {:?}", res); log_debug!("broker.connect : {:?}", res);
res.expect("assume the connection succeeds"); res.expect("assume the connection succeeds");
} }
@ -350,7 +353,7 @@ mod test {
async fn timer_close(remote_peer_id: DirectPeerId, user: Option<PubKey>) -> ResultSend<()> { async fn timer_close(remote_peer_id: DirectPeerId, user: Option<PubKey>) -> ResultSend<()> {
async move { async move {
sleep!(std::time::Duration::from_secs(3)); sleep!(std::time::Duration::from_secs(3));
log_info!("timeout"); log_debug!("timeout");
BROKER BROKER
.write() .write()
.await .await
@ -370,7 +373,7 @@ mod test {
#[async_std::test] #[async_std::test]
pub async fn probe() -> Result<(), NgError> { pub async fn probe() -> Result<(), NgError> {
log_info!("start probe"); log_debug!("start probe");
{ {
let res = BROKER let res = BROKER
.write() .write()
@ -381,7 +384,7 @@ mod test {
WS_PORT, WS_PORT,
) )
.await; .await;
log_info!("broker.probe : {:?}", res); log_debug!("broker.probe : {:?}", res);
res.expect("assume the probe succeeds"); res.expect("assume the probe succeeds");
} }

@ -47,7 +47,7 @@ impl IConnect for ConnectionWebSocket {
let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS); let mut cnx = ConnectionBase::new(ConnectionDir::Client, TransportProtocol::WS);
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| {
//log_info!("{:?}", e); //log_debug!("{:?}", e);
NetError::ConnectionError NetError::ConnectionError
})?; })?;
@ -71,7 +71,7 @@ impl IConnect for ConnectionWebSocket {
let url = format!("ws://{}:{}", ip, port); let url = format!("ws://{}:{}", ip, port);
let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| { let (mut ws, wsio) = WsMeta::connect(url, None).await.map_err(|e| {
//log_info!("{:?}", e); //log_debug!("{:?}", e);
ProtocolError::ConnectionError ProtocolError::ConnectionError
})?; })?;
@ -107,7 +107,7 @@ async fn ws_loop(
select! { select! {
r = stream.next().fuse() => match r { r = stream.next().fuse() => match r {
Some(msg) => { Some(msg) => {
log_info!("GOT MESSAGE {:?}", msg); log_debug!("GOT MESSAGE {:?}", msg);
if let WsMessage::Binary(b) = msg { if let WsMessage::Binary(b) = msg {
receiver.send(ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&b)?)).await receiver.send(ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&b)?)).await
.map_err(|_e| NetError::IoError)?; .map_err(|_e| NetError::IoError)?;
@ -120,11 +120,11 @@ async fn ws_loop(
}, },
s = sender.next().fuse() => match s { s = sender.next().fuse() => match s {
Some(msg) => { Some(msg) => {
log_info!("SENDING MESSAGE {:?}", msg); log_debug!("SENDING MESSAGE {:?}", msg);
match msg { match msg {
ConnectionCommand::Msg(m) => { ConnectionCommand::Msg(m) => {
stream.send(WsMessage::Binary(serde_bare::to_vec(&m)?)).await.map_err(|e| { log_info!("{:?}",e); return NetError::IoError;})?; stream.send(WsMessage::Binary(serde_bare::to_vec(&m)?)).await.map_err(|e| { log_debug!("{:?}",e); return NetError::IoError;})?;
}, },
ConnectionCommand::Error(e) => { ConnectionCommand::Error(e) => {
@ -135,6 +135,9 @@ async fn ws_loop(
}, },
ConnectionCommand::Close => { ConnectionCommand::Close => {
break; break;
},
ConnectionCommand::ReEnter => {
//do nothing. loop
} }
} }
}, },
@ -144,7 +147,7 @@ async fn ws_loop(
} }
Ok(ProtocolError::NoError) Ok(ProtocolError::NoError)
} }
log_info!("START of WS loop"); log_debug!("START of WS loop");
let mut events = ws let mut events = ws
.observe(ObserveConfig::default()) .observe(ObserveConfig::default())
//.observe(Filter::Pointer(WsEvent::is_closed).into()) //.observe(Filter::Pointer(WsEvent::is_closed).into())
@ -154,9 +157,9 @@ async fn ws_loop(
Ok(proto_err) => { Ok(proto_err) => {
if proto_err == ProtocolError::NoError { if proto_err == ProtocolError::NoError {
let _ = ws.close_code(1000).await; //.map_err(|_e| NetError::WsError)?; let _ = ws.close_code(1000).await; //.map_err(|_e| NetError::WsError)?;
log_info!("CLOSED GRACEFULLY"); log_debug!("CLOSED GRACEFULLY");
} else { } else {
log_info!("PROTOCOL ERR"); log_debug!("PROTOCOL ERR");
let mut code = proto_err.clone() as u16; let mut code = proto_err.clone() as u16;
if code > 949 { if code > 949 {
code = ProtocolError::OtherError as u16; code = ProtocolError::OtherError as u16;
@ -172,12 +175,12 @@ async fn ws_loop(
.await; .await;
//.map_err(|_e| NetError::WsError)?; //.map_err(|_e| NetError::WsError)?;
//return Err(Box::new(e)); //return Err(Box::new(e));
log_info!("ERR {:?}", e); log_debug!("ERR {:?}", e);
} }
} }
let last_event = events.next().await; let last_event = events.next().await;
log_info!("WS closed {:?}", last_event.clone()); log_debug!("WS closed {:?}", last_event.clone());
let last_command = match last_event { let last_command = match last_event {
None => ConnectionCommand::Close, None => ConnectionCommand::Close,
Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen Some(WsEvent::Open) => ConnectionCommand::Error(NetError::WsError), // this should never happen
@ -210,6 +213,6 @@ async fn ws_loop(
.await .await
.map_err(|_e| NetError::IoError)?; .map_err(|_e| NetError::IoError)?;
log_info!("END of WS loop"); log_debug!("END of WS loop");
Ok(()) Ok(())
} }

@ -120,7 +120,7 @@ impl<
let mut receiver = self.receiver.take().unwrap(); let mut receiver = self.receiver.take().unwrap();
match receiver.next().await { match receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => { Some(ConnectionCommand::Msg(msg)) => {
if let ProtocolMessage::BrokerMessage(ref bm) = msg { if let ProtocolMessage::ClientMessage(ref bm) = msg {
if bm.result() == Into::<u16>::into(ProtocolError::PartialContent) if bm.result() == Into::<u16>::into(ProtocolError::PartialContent)
&& TypeId::of::<B>() != TypeId::of::<()>() && TypeId::of::<B>() != TypeId::of::<()>()
{ {
@ -140,7 +140,7 @@ impl<
while let Some(ConnectionCommand::Msg(msg)) = while let Some(ConnectionCommand::Msg(msg)) =
actor_receiver.next().await actor_receiver.next().await
{ {
if let ProtocolMessage::BrokerMessage(ref bm) = msg { if let ProtocolMessage::ClientMessage(ref bm) = msg {
if bm.result() if bm.result()
== Into::<u16>::into(ProtocolError::EndOfStream) == Into::<u16>::into(ProtocolError::EndOfStream)
{ {
@ -155,7 +155,7 @@ impl<
break; break;
} }
} else { } else {
// todo deal with error (not a brokermessage) // todo deal with error (not a ClientMessage)
break; break;
} }
} }
@ -177,6 +177,9 @@ impl<
let response: B = msg.try_into()?; let response: B = msg.try_into()?;
Ok(SoS::<B>::Single(response)) Ok(SoS::<B>::Single(response))
} }
Some(ConnectionCommand::ProtocolError(e)) => Err(e),
Some(ConnectionCommand::Error(e)) => Err(e.into()),
Some(ConnectionCommand::Close) => Err(ProtocolError::Closing),
_ => Err(ProtocolError::ActorError), _ => Err(ProtocolError::ActorError),
} }
} }

@ -0,0 +1,109 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::broker::{ServerConfig, BROKER};
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use async_std::sync::Mutex;
use p2p_repo::log::*;
use p2p_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
/// Add user account
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct AddUserV0 {
/// User pub key
pub user: PubKey,
/// should the newly added user be an admin of the server
pub is_admin: bool,
}
/// Add user account
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum AddUser {
V0(AddUserV0),
}
impl AddUser {
pub fn user(&self) -> PubKey {
match self {
AddUser::V0(o) => o.user,
}
}
pub fn is_admin(&self) -> bool {
match self {
AddUser::V0(o) => o.is_admin,
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<AddUser, AdminResponse>::new_responder()
}
}
impl TryFrom<ProtocolMessage> for AddUser {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::Start(StartProtocol::Admin(AdminRequest::V0(AdminRequestV0 {
content: AdminRequestContentV0::AddUser(a),
..
}))) = msg
{
Ok(a)
} else {
log_debug!("INVALID {:?}", msg);
Err(ProtocolError::InvalidValue)
}
}
}
impl From<AddUser> for ProtocolMessage {
fn from(msg: AddUser) -> ProtocolMessage {
unimplemented!();
}
}
impl From<AddUser> for AdminRequestContentV0 {
fn from(msg: AddUser) -> AdminRequestContentV0 {
AdminRequestContentV0::AddUser(msg)
}
}
impl Actor<'_, AddUser, AdminResponse> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, AddUser, AdminResponse> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
let req = AddUser::try_from(msg)?;
let broker = BROKER.read().await;
let mut is_admin = req.is_admin();
if let Some(ServerConfig {
admin_user: Some(admin_user),
..
}) = broker.get_config()
{
if *admin_user == req.user() {
is_admin = true;
}
}
let res = broker.get_storage()?.add_user(req.user(), is_admin);
let response: AdminResponseV0 = res.into();
fsm.lock().await.send(response.into()).await?;
Ok(())
}
}

@ -0,0 +1,86 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::broker::BROKER;
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use async_std::sync::Mutex;
use p2p_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
/// Delete user account V0
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct DelUserV0 {
/// User pub key
pub user: PubKey,
}
/// Delete user account
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum DelUser {
V0(DelUserV0),
}
impl DelUser {
pub fn user(&self) -> PubKey {
match self {
DelUser::V0(o) => o.user,
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<DelUser, AdminResponse>::new_responder()
}
}
impl TryFrom<ProtocolMessage> for DelUser {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::Start(StartProtocol::Admin(AdminRequest::V0(AdminRequestV0 {
content: AdminRequestContentV0::DelUser(a),
..
}))) = msg
{
Ok(a)
} else {
Err(ProtocolError::InvalidValue)
}
}
}
impl From<DelUser> for ProtocolMessage {
fn from(msg: DelUser) -> ProtocolMessage {
unimplemented!();
}
}
impl Actor<'_, DelUser, AdminResponse> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, DelUser, AdminResponse> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
//let req = DelUser::try_from(msg)?;
let res = AdminResponseV0 {
id: 0,
result: 0,
content: AdminResponseContentV0::EmptyResponse,
padding: vec![],
};
fsm.lock().await.send(res.into()).await?;
Ok(())
}
}

@ -0,0 +1,91 @@
/*
* Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
* All rights reserved.
* Licensed under the Apache License, Version 2.0
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
* at your option. All files in the project carrying such
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
use crate::broker::BROKER;
use crate::connection::NoiseFSM;
use crate::types::*;
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use async_std::sync::Mutex;
use p2p_repo::log::*;
use p2p_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
/// List users registered on this broker
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct ListUsersV0 {
/// should list only the admins. if false, admin users will be excluded
pub admins: bool,
}
/// List users registered on this broker
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ListUsers {
V0(ListUsersV0),
}
impl ListUsers {
pub fn admins(&self) -> bool {
match self {
Self::V0(o) => o.admins,
}
}
pub fn get_actor(&self) -> Box<dyn EActor> {
Actor::<ListUsers, AdminResponse>::new_responder()
}
}
impl TryFrom<ProtocolMessage> for ListUsers {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::Start(StartProtocol::Admin(AdminRequest::V0(AdminRequestV0 {
content: AdminRequestContentV0::ListUsers(a),
..
}))) = msg
{
Ok(a)
} else {
//log_debug!("INVALID {:?}", msg);
Err(ProtocolError::InvalidValue)
}
}
}
impl From<ListUsers> for ProtocolMessage {
fn from(msg: ListUsers) -> ProtocolMessage {
unimplemented!();
}
}
impl From<ListUsers> for AdminRequestContentV0 {
fn from(msg: ListUsers) -> AdminRequestContentV0 {
AdminRequestContentV0::ListUsers(msg)
}
}
impl Actor<'_, ListUsers, AdminResponse> {}
#[async_trait::async_trait]
impl EActor for Actor<'_, ListUsers, AdminResponse> {
async fn respond(
&mut self,
msg: ProtocolMessage,
fsm: Arc<Mutex<NoiseFSM>>,
) -> Result<(), ProtocolError> {
let req = ListUsers::try_from(msg)?;
let res = BROKER.read().await.get_storage()?.list_users(req.admins());
let response: AdminResponseV0 = res.into();
fsm.lock().await.send(response.into()).await?;
Ok(())
}
}

@ -6,3 +6,12 @@ pub use start::*;
pub mod probe; pub mod probe;
pub use probe::*; pub use probe::*;
pub mod add_user;
pub use add_user::*;
pub mod del_user;
pub use del_user::*;
pub mod list_users;
pub use list_users::*;

@ -11,7 +11,7 @@
use crate::actors::noise::Noise; use crate::actors::noise::Noise;
use crate::connection::NoiseFSM; use crate::connection::NoiseFSM;
use crate::types::ExtResponse; use crate::types::{AdminRequest, ExtResponse};
use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage}; use crate::{actor::*, errors::ProtocolError, types::ProtocolMessage};
use async_std::sync::Mutex; use async_std::sync::Mutex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -26,6 +26,8 @@ use std::sync::Arc;
pub enum StartProtocol { pub enum StartProtocol {
Client(ClientHello), Client(ClientHello),
Ext(ExtHello), Ext(ExtHello),
//Core(CoreHello),
Admin(AdminRequest),
} }
impl StartProtocol { impl StartProtocol {
@ -33,16 +35,24 @@ impl StartProtocol {
match self { match self {
StartProtocol::Client(a) => a.type_id(), StartProtocol::Client(a) => a.type_id(),
StartProtocol::Ext(a) => a.type_id(), StartProtocol::Ext(a) => a.type_id(),
StartProtocol::Admin(a) => a.type_id(),
} }
} }
pub fn get_actor(&self) -> Box<dyn EActor> { pub fn get_actor(&self) -> Box<dyn EActor> {
match self { match self {
StartProtocol::Client(a) => a.get_actor(), StartProtocol::Client(a) => a.get_actor(),
StartProtocol::Ext(a) => a.get_actor(), StartProtocol::Ext(a) => a.get_actor(),
StartProtocol::Admin(a) => a.get_actor(),
} }
} }
} }
impl From<StartProtocol> for ProtocolMessage {
fn from(msg: StartProtocol) -> ProtocolMessage {
ProtocolMessage::Start(msg)
}
}
/// External Hello (finalizes the Noise handshake and send first ExtRequest) /// External Hello (finalizes the Noise handshake and send first ExtRequest)
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExtHello { pub struct ExtHello {
@ -59,12 +69,6 @@ impl ExtHello {
} }
} }
// impl BrokerRequest for ExtHello {
// fn send(&self) -> ProtocolMessage {
// ProtocolMessage::Start(StartProtocol::Ext(self.clone()))
// }
// }
impl From<ExtHello> for ProtocolMessage { impl From<ExtHello> for ProtocolMessage {
fn from(msg: ExtHello) -> ProtocolMessage { fn from(msg: ExtHello) -> ProtocolMessage {
ProtocolMessage::Start(StartProtocol::Ext(msg)) ProtocolMessage::Start(StartProtocol::Ext(msg))

@ -59,6 +59,14 @@ pub struct DirectConnection {
cnx: ConnectionBase, cnx: ConnectionBase,
} }
#[derive(Debug)]
pub struct ServerConfig {
pub overlays_configs: Vec<BrokerOverlayConfigV0>,
pub registration: RegistrationConfig,
pub admin_user: Option<PubKey>,
pub peer_id: PubKey,
}
pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new()))); pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new())));
pub struct Broker<'a> { pub struct Broker<'a> {
@ -70,11 +78,10 @@ pub struct Broker<'a> {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
listeners: HashMap<String, ListenerInfo>, listeners: HashMap<String, ListenerInfo>,
bind_addresses: HashMap<BindAddress, String>, bind_addresses: HashMap<BindAddress, String>,
overlays_configs: Vec<BrokerOverlayConfigV0>, config: Option<ServerConfig>,
shutdown: Option<Receiver<ProtocolError>>, shutdown: Option<Receiver<ProtocolError>>,
shutdown_sender: Sender<ProtocolError>, shutdown_sender: Sender<ProtocolError>,
closing: bool, closing: bool,
my_peer_id: Option<PubKey>,
storage: Option<Box<dyn BrokerStorage + Send + Sync + 'a>>, storage: Option<Box<dyn BrokerStorage + Send + Sync + 'a>>,
test: u32, test: u32,
@ -97,16 +104,19 @@ impl<'a> Broker<'a> {
} }
} }
pub fn set_my_peer_id(&mut self, id: PubKey) { pub fn get_config(&self) -> Option<&ServerConfig> {
if self.my_peer_id.is_none() { self.config.as_ref()
self.my_peer_id = Some(id)
}
} }
pub fn set_storage(&mut self, storage: impl BrokerStorage + 'a) { pub fn set_storage(&mut self, storage: impl BrokerStorage + 'a) {
//log_debug!("set_storage");
self.storage = Some(Box::new(storage)); self.storage = Some(Box::new(storage));
} }
pub fn set_server_config(&mut self, config: ServerConfig) {
self.config = Some(config);
}
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn set_listeners( pub fn set_listeners(
&mut self, &mut self,
@ -125,24 +135,29 @@ impl<'a> Broker<'a> {
(copy_listeners, copy_bind_addresses) (copy_listeners, copy_bind_addresses)
} }
pub fn get_storage(&self) -> Result<&Box<dyn BrokerStorage + Send + Sync + 'a>, ProtocolError> {
//log_debug!("GET STORAGE {:?}", self.storage);
self.storage.as_ref().ok_or(ProtocolError::BrokerError)
}
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn authorize( pub fn authorize(
&self, &self,
remote_bind_address: &BindAddress, bind_addresses: &(BindAddress, BindAddress),
auth: Authorization, auth: Authorization,
) -> Result<(), ProtocolError> { ) -> Result<(), ProtocolError> {
match auth {
Authorization::Discover => {
let listener_id = self let listener_id = self
.bind_addresses .bind_addresses
.get(remote_bind_address) .get(&bind_addresses.0)
.ok_or(ProtocolError::BrokerError)?; .ok_or(ProtocolError::BrokerError)?;
let listener = self let listener = self
.listeners .listeners
.get(listener_id) .get(listener_id)
.ok_or(ProtocolError::BrokerError)?; .ok_or(ProtocolError::BrokerError)?;
match auth {
Authorization::Discover => {
if listener.config.discoverable if listener.config.discoverable
&& remote_bind_address.ip.is_private() && bind_addresses.1.ip.is_private()
&& listener.config.accept_forward_for.is_no() && listener.config.accept_forward_for.is_no()
{ {
Ok(()) Ok(())
@ -153,14 +168,35 @@ impl<'a> Broker<'a> {
Authorization::ExtMessage => Err(ProtocolError::AccessDenied), Authorization::ExtMessage => Err(ProtocolError::AccessDenied),
Authorization::Client(user) => Err(ProtocolError::AccessDenied), Authorization::Client(user) => Err(ProtocolError::AccessDenied),
Authorization::Core => Err(ProtocolError::AccessDenied), Authorization::Core => Err(ProtocolError::AccessDenied),
Authorization::Admin(_) => Err(ProtocolError::AccessDenied), Authorization::Admin(admin_user) => {
if listener.config.accepts_client() {
if let Some(ServerConfig {
admin_user: Some(admin),
..
}) = self.config
{
if admin == admin_user {
return Ok(());
}
}
let found = self.get_storage()?.get_user(admin_user);
if found.is_ok() && found.unwrap() {
return Ok(());
}
}
Err(ProtocolError::AccessDenied)
}
Authorization::OverlayJoin(_) => Err(ProtocolError::AccessDenied), Authorization::OverlayJoin(_) => Err(ProtocolError::AccessDenied),
} }
} }
pub fn set_overlays_configs(&mut self, overlays_configs: Vec<BrokerOverlayConfigV0>) { // pub fn add_user(&self, user: PubKey, is_admin: bool) -> Result<(), ProtocolError> {
self.overlays_configs.extend(overlays_configs) // self.get_storage()?.add_user(user, is_admin)
} // }
// pub fn list_users(&self, admins: bool) -> Result<Vec<PubKey>, ProtocolError> {
// self.get_storage()?.list_users(admins)
// }
pub async fn get_block_from_store_with_block_id( pub async fn get_block_from_store_with_block_id(
&mut self, &mut self,
@ -171,7 +207,7 @@ impl<'a> Broker<'a> {
// TODO // TODO
let (mut tx, rx) = mpsc::unbounded::<Block>(); let (mut tx, rx) = mpsc::unbounded::<Block>();
//log_info!("cur {}", std::env::current_dir().unwrap().display()); //log_debug!("cur {}", std::env::current_dir().unwrap().display());
//Err(ProtocolError::AccessDenied) //Err(ProtocolError::AccessDenied)
// let f = std::fs::File::open( // let f = std::fs::File::open(
@ -243,10 +279,10 @@ impl<'a> Broker<'a> {
.unwrap(); .unwrap();
async fn send(mut tx: Sender<Commit>, commit: Commit) -> ResultSend<()> { async fn send(mut tx: Sender<Commit>, commit: Commit) -> ResultSend<()> {
while let Ok(_) = tx.send(commit.clone()).await { while let Ok(_) = tx.send(commit.clone()).await {
log_info!("sending"); log_debug!("sending");
sleep!(std::time::Duration::from_secs(3)); sleep!(std::time::Duration::from_secs(3));
} }
log_info!("end of sending"); log_debug!("end of sending");
Ok(()) Ok(())
} }
spawn_and_log_error(send(tx.clone(), commit)); spawn_and_log_error(send(tx.clone(), commit));
@ -310,7 +346,7 @@ impl<'a> Broker<'a> {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
listeners: HashMap::new(), listeners: HashMap::new(),
bind_addresses: HashMap::new(), bind_addresses: HashMap::new(),
overlays_configs: vec![], config: None,
shutdown: Some(shutdown_receiver), shutdown: Some(shutdown_receiver),
shutdown_sender, shutdown_sender,
direct_connections: HashMap::new(), direct_connections: HashMap::new(),
@ -318,7 +354,6 @@ impl<'a> Broker<'a> {
tauri_streams: HashMap::new(), tauri_streams: HashMap::new(),
closing: false, closing: false,
test: u32::from_be_bytes(random_buf), test: u32::from_be_bytes(random_buf),
my_peer_id: None,
storage: None, storage: None,
} }
} }
@ -346,7 +381,7 @@ impl<'a> Broker<'a> {
async fn timer_shutdown(timeout: std::time::Duration) -> ResultSend<()> { async fn timer_shutdown(timeout: std::time::Duration) -> ResultSend<()> {
async move { async move {
sleep!(timeout); sleep!(timeout);
log_info!("timeout for shutdown"); log_debug!("timeout for shutdown");
let _ = BROKER let _ = BROKER
.write() .write()
.await .await
@ -435,11 +470,11 @@ impl<'a> Broker<'a> {
match res { match res {
Some(Either::Right(remote_peer_id)) => { Some(Either::Right(remote_peer_id)) => {
let res = join.next().await; let res = join.next().await;
log_info!("SOCKET IS CLOSED {:?} peer_id: {:?}", res, remote_peer_id); log_debug!("SOCKET IS CLOSED {:?} peer_id: {:?}", res, remote_peer_id);
BROKER.write().await.remove_peer_id(remote_peer_id, None); BROKER.write().await.remove_peer_id(remote_peer_id, None);
} }
_ => { _ => {
log_info!( log_debug!(
"SOCKET IS CLOSED {:?} remote: {:?} local: {:?}", "SOCKET IS CLOSED {:?} remote: {:?} local: {:?}",
res, res,
remote_bind_address, remote_bind_address,
@ -545,6 +580,45 @@ impl<'a> Broker<'a> {
cnx.probe(ip, port).await cnx.probe(ip, port).await
} }
pub async fn admin<
A: Into<ProtocolMessage>
+ Into<AdminRequestContentV0>
+ std::fmt::Debug
+ Sync
+ Send
+ 'static,
>(
&mut self,
cnx: Box<dyn IConnect>,
peer_privk: PrivKey,
peer_pubk: PubKey,
remote_peer_id: DirectPeerId,
user: PubKey,
user_priv: PrivKey,
addr: BindAddress,
request: A,
) -> Result<AdminResponseContentV0, ProtocolError> {
let config = StartConfig::Admin(AdminConfig {
user,
user_priv,
addr,
request: request.into(),
});
let remote_peer_id_dh = remote_peer_id.to_dh_from_ed();
let mut connection = cnx
.open(
config.get_url(),
peer_privk.clone(),
peer_pubk,
remote_peer_id_dh,
config.clone(),
)
.await?;
connection.admin::<A>().await
}
pub async fn connect( pub async fn connect(
&mut self, &mut self,
cnx: Box<dyn IConnect>, cnx: Box<dyn IConnect>,
@ -557,9 +631,11 @@ impl<'a> Broker<'a> {
return Err(NetError::Closing); return Err(NetError::Closing);
} }
log_info!("CONNECTING"); log_debug!("CONNECTING");
let remote_peer_id_dh = remote_peer_id.to_dh_from_ed(); let remote_peer_id_dh = remote_peer_id.to_dh_from_ed();
// checking if already connected
if config.is_keep_alive() {
let already = self let already = self
.peers .peers
.get(&(config.get_user(), *remote_peer_id_dh.slice())); .get(&(config.get_user(), *remote_peer_id_dh.slice()));
@ -571,6 +647,7 @@ impl<'a> Broker<'a> {
} }
}; };
} }
}
let mut connection = cnx let mut connection = cnx
.open( .open(
@ -582,6 +659,10 @@ impl<'a> Broker<'a> {
) )
.await?; .await?;
if !config.is_keep_alive() {
return Ok(());
}
let join = connection.take_shutdown(); let join = connection.take_shutdown();
let connected = match &config { let connected = match &config {
@ -618,7 +699,7 @@ impl<'a> Broker<'a> {
) -> ResultSend<()> { ) -> ResultSend<()> {
async move { async move {
let res = join.next().await; let res = join.next().await;
log_info!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id); log_debug!("SOCKET IS CLOSED {:?} {:?}", res, remote_peer_id);
if res.is_some() if res.is_some()
&& res.as_ref().unwrap().is_left() && res.as_ref().unwrap().is_left()
&& res.unwrap().unwrap_left() != NetError::Closing && res.unwrap().unwrap_left() != NetError::Closing
@ -630,10 +711,10 @@ impl<'a> Broker<'a> {
// let result = broker // let result = broker
// .connect(cnx, ip, core, peer_pubk, peer_privk, remote_peer_id) // .connect(cnx, ip, core, peer_pubk, peer_privk, remote_peer_id)
// .await; // .await;
// log_info!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id); // log_debug!("SOCKET RECONNECTION {:?} {:?}", result, &remote_peer_id);
// TODO: deal with error and incremental backoff // TODO: deal with error and incremental backoff
} else { } else {
log_info!("REMOVED"); log_debug!("REMOVED");
BROKER BROKER
.write() .write()
.await .await
@ -690,13 +771,13 @@ impl<'a> Broker<'a> {
pub fn print_status(&self) { pub fn print_status(&self) {
self.peers.iter().for_each(|(peerId, peerInfo)| { self.peers.iter().for_each(|(peerId, peerInfo)| {
log_info!("PEER in BROKER {:?} {:?}", peerId, peerInfo); log_debug!("PEER in BROKER {:?} {:?}", peerId, peerInfo);
}); });
self.direct_connections.iter().for_each(|(ip, directCnx)| { self.direct_connections.iter().for_each(|(ip, directCnx)| {
log_info!("direct_connection in BROKER {:?} {:?}", ip, directCnx) log_debug!("direct_connection in BROKER {:?} {:?}", ip, directCnx)
}); });
self.anonymous_connections.iter().for_each(|(binds, cb)| { self.anonymous_connections.iter().for_each(|(binds, cb)| {
log_info!( log_debug!(
"ANONYMOUS remote {:?} local {:?} {:?}", "ANONYMOUS remote {:?} local {:?} {:?}",
binds.1, binds.1,
binds.0, binds.0,

@ -9,9 +9,11 @@
* according to those terms. * according to those terms.
*/ */
use crate::types::*; use crate::{errors::ProtocolError, types::*};
use p2p_repo::kcv_store::KCVStore; use p2p_repo::{kcv_store::KCVStore, types::PubKey};
pub trait BrokerStorage: Send + Sync { pub trait BrokerStorage: Send + Sync + std::fmt::Debug {
fn get_user(&self); fn get_user(&self, user_id: PubKey) -> Result<bool, ProtocolError>;
fn add_user(&self, user_id: PubKey, is_admin: bool) -> Result<(), ProtocolError>;
fn list_users(&self, admins: bool) -> Result<Vec<PubKey>, ProtocolError>;
} }

@ -11,6 +11,7 @@
//static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b"; //static NOISE_CONFIG: &'static str = "Noise_XK_25519_ChaChaPoly_BLAKE2b";
use std::any::TypeId;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
@ -45,6 +46,16 @@ pub enum ConnectionCommand {
Error(NetError), Error(NetError),
ProtocolError(ProtocolError), ProtocolError(ProtocolError),
Close, Close,
ReEnter,
}
impl ConnectionCommand {
pub fn is_re_enter(&self) -> bool {
match self {
Self::ReEnter => true,
_ => false,
}
}
} }
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
@ -96,7 +107,8 @@ pub enum FSMstate {
Noise0, // unused Noise0, // unused
Noise1, Noise1,
Noise2, Noise2,
Noise3, // unused Noise3,
AdminRequest,
ExtRequest, ExtRequest,
ExtResponse, ExtResponse,
ClientHello, ClientHello,
@ -141,9 +153,10 @@ pub enum StepReply {
Response(ProtocolMessage), Response(ProtocolMessage),
NONE, NONE,
CloseNow, CloseNow,
ReEnter,
} }
#[derive(PartialEq, Debug, Clone)] #[derive(Debug, Clone)]
pub struct ClientConfig { pub struct ClientConfig {
pub url: String, pub url: String,
pub user: PubKey, pub user: PubKey,
@ -153,19 +166,24 @@ pub struct ClientConfig {
pub info: ClientInfo, pub info: ClientInfo,
} }
#[derive(PartialEq, Debug, Clone)] #[derive(Debug, Clone)]
pub struct ExtConfig {} pub struct ExtConfig {}
#[derive(PartialEq, Debug, Clone)] #[derive(Debug, Clone)]
pub struct CoreConfig { pub struct CoreConfig {
pub addr: BindAddress, pub addr: BindAddress,
pub interface: String, pub interface: String,
} }
#[derive(PartialEq, Debug, Clone)] #[derive(Debug, Clone)]
pub struct AdminConfig {} pub struct AdminConfig {
pub user: PubKey,
pub user_priv: PrivKey,
pub addr: BindAddress,
pub request: AdminRequestContentV0,
}
#[derive(PartialEq, Debug, Clone)] #[derive(Debug, Clone)]
pub enum StartConfig { pub enum StartConfig {
Probe, Probe,
Relay(BindAddress), Relay(BindAddress),
@ -179,6 +197,7 @@ impl StartConfig {
pub fn get_url(&self) -> String { pub fn get_url(&self) -> String {
match self { match self {
Self::Client(config) => config.url.clone(), Self::Client(config) => config.url.clone(),
Self::Admin(config) => format!("ws://{}:{}", config.addr.ip, config.addr.port),
Self::Core(config) => format!("ws://{}:{}", config.addr.ip, config.addr.port), Self::Core(config) => format!("ws://{}:{}", config.addr.ip, config.addr.port),
_ => unimplemented!(), _ => unimplemented!(),
} }
@ -189,6 +208,12 @@ impl StartConfig {
_ => None, _ => None,
} }
} }
pub fn is_keep_alive(&self) -> bool {
match self {
StartConfig::Core(_) | StartConfig::Client(_) => true,
_ => false,
}
}
} }
impl NoiseFSM { impl NoiseFSM {
@ -313,11 +338,33 @@ impl NoiseFSM {
return Ok(StepReply::NONE); return Ok(StepReply::NONE);
} }
fn process_server_noise3(&mut self, noise: &Noise) -> Result<(), ProtocolError> {
let handshake = self.noise_handshake_state.as_mut().unwrap();
let _ = handshake
.read_message_vec(noise.data())
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?;
if !handshake.completed() {
return Err(ProtocolError::NoiseHandshakeFailed);
}
let peer_id = handshake.get_rs().unwrap();
self.remote = Some(PubKey::X25519PubKey(peer_id));
let ciphers = handshake.get_ciphers();
self.noise_cipher_state_enc = Some(ciphers.1);
self.noise_cipher_state_dec = Some(ciphers.0);
self.noise_handshake_state = None;
Ok(())
}
pub async fn step( pub async fn step(
&mut self, &mut self,
mut msg_opt: Option<ProtocolMessage>, mut msg_opt: Option<ProtocolMessage>,
) -> Result<StepReply, ProtocolError> { ) -> Result<StepReply, ProtocolError> {
if self.noise_cipher_state_dec.is_some() { if self.noise_cipher_state_dec.is_some() && msg_opt.is_some() {
if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() { if let Some(ProtocolMessage::Noise(noise)) = msg_opt.as_ref() {
let new = self.decrypt(noise)?; let new = self.decrypt(noise)?;
msg_opt.replace(new); msg_opt.replace(new);
@ -326,7 +373,11 @@ impl NoiseFSM {
} }
} }
if msg_opt.is_some() { if msg_opt.is_some() {
log_debug!("RECEIVED: {:?}", msg_opt.as_ref().unwrap()); log_debug!(
"RECEIVED: {:?} in state {:?}",
msg_opt.as_ref().unwrap(),
self.state
);
} }
match self.state { match self.state {
FSMstate::Closing => {} FSMstate::Closing => {}
@ -413,8 +464,7 @@ impl NoiseFSM {
.authorize( .authorize(
&self &self
.bind_addresses .bind_addresses
.ok_or(ProtocolError::BrokerError)? .ok_or(ProtocolError::BrokerError)?,
.1,
Authorization::Discover, Authorization::Discover,
) )
.is_ok() .is_ok()
@ -492,6 +542,7 @@ impl NoiseFSM {
let ciphers = handshake.get_ciphers(); let ciphers = handshake.get_ciphers();
let mut next_step = StepReply::NONE;
match self.config.as_ref().unwrap() { match self.config.as_ref().unwrap() {
StartConfig::Client(client_config) => { StartConfig::Client(client_config) => {
let noise3 = let noise3 =
@ -506,7 +557,10 @@ impl NoiseFSM {
todo!(); todo!();
} }
StartConfig::Admin(admin_config) => { StartConfig::Admin(admin_config) => {
todo!(); let noise = Noise::V0(NoiseV0 { data: payload });
self.send(noise.into()).await?;
self.state = FSMstate::Noise3;
next_step = StepReply::ReEnter;
} }
_ => return Err(ProtocolError::InvalidState), _ => return Err(ProtocolError::InvalidState),
} }
@ -516,7 +570,7 @@ impl NoiseFSM {
self.noise_handshake_state = None; self.noise_handshake_state = None;
return Ok(StepReply::NONE); return Ok(next_step);
} }
} }
} }
@ -529,23 +583,7 @@ impl NoiseFSM {
noise, noise,
))) = msg ))) = msg
{ {
let handshake = self.noise_handshake_state.as_mut().unwrap(); self.process_server_noise3(noise)?;
let _ = handshake
.read_message_vec(noise.data())
.map_err(|e| ProtocolError::NoiseHandshakeFailed)?;
if !handshake.completed() {
return Err(ProtocolError::NoiseHandshakeFailed);
}
let peer_id = handshake.get_rs().unwrap();
self.remote = Some(PubKey::X25519PubKey(peer_id));
let ciphers = handshake.get_ciphers();
self.noise_cipher_state_enc = Some(ciphers.1);
self.noise_cipher_state_dec = Some(ciphers.0);
self.noise_handshake_state = None;
let mut nonce_buf = [0u8; 32]; let mut nonce_buf = [0u8; 32];
getrandom::getrandom(&mut nonce_buf).unwrap(); getrandom::getrandom(&mut nonce_buf).unwrap();
@ -560,11 +598,94 @@ impl NoiseFSM {
self.send(server_hello.into()).await?; self.send(server_hello.into()).await?;
return Ok(StepReply::NONE); return Ok(StepReply::NONE);
} else if let ProtocolMessage::Noise(noise) = msg {
self.process_server_noise3(noise)?;
self.state = FSMstate::Noise3;
return Ok(StepReply::NONE);
}
}
}
}
FSMstate::Noise3 => {
// CLIENT after Noise3, sending StartProtocol
if msg_opt.is_none() && !self.dir.is_server() {
match self.config.as_ref().unwrap() {
StartConfig::Client(_) => {
return Err(ProtocolError::InvalidState);
}
StartConfig::Ext(ext_config) => {
todo!();
}
StartConfig::Core(core_config) => {
todo!();
}
StartConfig::Admin(admin_config) => {
let ser = serde_bare::to_vec(&admin_config.request)?;
let sig = sign(&admin_config.user_priv, &admin_config.user, &ser)?;
let admin_req = AdminRequestV0 {
content: admin_config.request.clone(),
id: 0,
sig,
admin_user: admin_config.user,
padding: vec![],
};
let protocol_start = StartProtocol::Admin(AdminRequest::V0(admin_req));
self.send(protocol_start.into()).await?;
self.state = FSMstate::AdminRequest;
return Ok(StepReply::NONE);
}
_ => return Err(ProtocolError::InvalidState),
}
} else if self.dir.is_server() {
// SERVER after Noise3, receives StartProtocol
#[cfg(not(target_arch = "wasm32"))]
if let Some(ProtocolMessage::Start(start_msg)) = msg_opt.as_ref() {
match start_msg {
StartProtocol::Client(_) => {
return Err(ProtocolError::InvalidState);
} }
StartProtocol::Ext(ext_config) => {
todo!();
}
// StartProtocol::Core(core_config) => {
// todo!();
// }
StartProtocol::Admin(AdminRequest::V0(req)) => {
BROKER.read().await.authorize(
&self.bind_addresses.ok_or(ProtocolError::BrokerError)?,
Authorization::Admin(req.admin_user),
)?;
// PROCESS AdminRequest and send back AdminResponse
let ser = serde_bare::to_vec(&req.content)?;
let verif = verify(&ser, req.sig, req.admin_user);
if verif.is_err() {
let result: ProtocolError = verif.unwrap_err().into();
return Err(result);
} else {
self.state = FSMstate::Closing;
return Ok(StepReply::Responder(msg_opt.unwrap()));
}
}
_ => return Err(ProtocolError::InvalidState),
}
}
}
}
FSMstate::AdminRequest => {
// CLIENT side receiving AdminResponse
if let Some(msg) = msg_opt {
if self.dir.is_server() || msg.type_id() != TypeId::of::<AdminResponse>() {
return Err(ProtocolError::InvalidState);
} }
return Ok(StepReply::Response(msg));
} }
} }
FSMstate::Noise3 => {}
FSMstate::ExtRequest => {} FSMstate::ExtRequest => {}
FSMstate::ExtResponse => {} FSMstate::ExtResponse => {}
FSMstate::ClientHello => { FSMstate::ClientHello => {
@ -584,10 +705,10 @@ impl NoiseFSM {
}; };
let ser = serde_bare::to_vec(&content)?; let ser = serde_bare::to_vec(&content)?;
let sig = let sig =
sign(&client_config.client_priv, &client_config.client, &ser)?; sign(&client_config.user_priv, &client_config.user, &ser)?;
let client_auth = ClientAuth::V0(ClientAuthV0 { let client_auth = ClientAuth::V0(ClientAuthV0 {
content, content,
/// Signature by client key /// Signature by user key
sig, sig,
}); });
@ -613,7 +734,7 @@ impl NoiseFSM {
let ser = serde_bare::to_vec(&client_auth.content_v0())?; let ser = serde_bare::to_vec(&client_auth.content_v0())?;
let mut result = ProtocolError::NoError; let mut result = ProtocolError::NoError;
let verif = verify(&ser, client_auth.sig(), client_auth.client()); let verif = verify(&ser, client_auth.sig(), client_auth.user());
if verif.is_err() { if verif.is_err() {
result = verif.unwrap_err().into(); result = verif.unwrap_err().into();
} else { } else {
@ -641,7 +762,7 @@ impl NoiseFSM {
if (result.is_err()) { if (result.is_err()) {
return Err(result); return Err(result);
} }
log_info!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the server side"); log_debug!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the server side");
self.state = FSMstate::AuthResult; self.state = FSMstate::AuthResult;
return Ok(StepReply::NONE); return Ok(StepReply::NONE);
} }
@ -661,7 +782,7 @@ impl NoiseFSM {
self.state = FSMstate::AuthResult; self.state = FSMstate::AuthResult;
log_info!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the client side"); log_debug!("AUTHENTICATION SUCCESSFUL ! waiting for requests on the client side");
return Ok(StepReply::NONE); return Ok(StepReply::NONE);
} }
@ -671,6 +792,9 @@ impl NoiseFSM {
} }
FSMstate::AuthResult => { FSMstate::AuthResult => {
if let Some(msg) = msg_opt { if let Some(msg) = msg_opt {
if msg.type_id() != TypeId::of::<ClientMessage>() {
return Err(ProtocolError::AccessDenied);
}
let id = msg.id(); let id = msg.id();
if self.dir.is_server() && id > 0 || !self.dir.is_server() && id < 0 { if self.dir.is_server() && id > 0 || !self.dir.is_server() && id < 0 {
return Ok(StepReply::Responder(msg)); return Ok(StepReply::Responder(msg));
@ -773,6 +897,7 @@ impl ConnectionBase {
} }
async fn read_loop( async fn read_loop(
mut receiver_tx: Sender<ConnectionCommand>,
mut receiver: Receiver<ConnectionCommand>, mut receiver: Receiver<ConnectionCommand>,
mut sender: Sender<ConnectionCommand>, mut sender: Sender<ConnectionCommand>,
actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>, actors: Arc<Mutex<HashMap<i64, Sender<ConnectionCommand>>>>,
@ -783,15 +908,29 @@ impl ConnectionBase {
ConnectionCommand::Close ConnectionCommand::Close
| ConnectionCommand::Error(_) | ConnectionCommand::Error(_)
| ConnectionCommand::ProtocolError(_) => { | ConnectionCommand::ProtocolError(_) => {
log_info!("EXIT READ LOOP because : {:?}", msg); log_debug!("EXIT READ LOOP because : {:?}", msg);
let mut lock = actors.lock().await;
for actor in lock.values_mut() {
actor.send(msg.clone()).await;
}
break; break;
} }
ConnectionCommand::Msg(proto_msg) => { _ => {
let res; let res;
if let ConnectionCommand::Msg(proto_msg) = msg {
{ {
let mut locked_fsm = fsm.lock().await; let mut locked_fsm = fsm.lock().await;
res = locked_fsm.step(Some(proto_msg)).await; res = locked_fsm.step(Some(proto_msg)).await;
} }
} else if msg.is_re_enter() {
{
let mut locked_fsm = fsm.lock().await;
res = locked_fsm.step(None).await;
}
} else {
panic!("shouldn't be here. ConnectionCommand is read_loop can only have 5 different variants")
}
match res { match res {
Err(e) => { Err(e) => {
if sender if sender
@ -806,6 +945,9 @@ impl ConnectionBase {
let _ = sender.send(ConnectionCommand::Close).await; let _ = sender.send(ConnectionCommand::Close).await;
break; break;
} }
Ok(StepReply::ReEnter) => {
let _ = receiver_tx.send(ConnectionCommand::ReEnter).await;
}
Ok(StepReply::NONE) => {} Ok(StepReply::NONE) => {}
Ok(StepReply::Responder(responder)) => { Ok(StepReply::Responder(responder)) => {
let r = responder let r = responder
@ -852,7 +994,11 @@ impl ConnectionBase {
} }
} }
} }
log_info!("END OF READ LOOP"); log_debug!("END OF READ LOOP");
let mut lock = actors.lock().await;
for actor in lock.drain() {
actor.1.close_channel();
}
Ok(()) Ok(())
} }
@ -895,10 +1041,51 @@ impl ConnectionBase {
// } // }
pub async fn close(&mut self) { pub async fn close(&mut self) {
log_info!("closing..."); log_debug!("closing...");
self.send(ConnectionCommand::Close).await; self.send(ConnectionCommand::Close).await;
} }
pub async fn admin<
A: Into<ProtocolMessage>
+ Into<AdminRequestContentV0>
+ std::fmt::Debug
+ Sync
+ Send
+ 'static,
>(
&mut self,
) -> Result<AdminResponseContentV0, ProtocolError> {
if !self.dir.is_server() {
let mut actor = Box::new(Actor::<A, AdminResponse>::new(0, true));
self.actors.lock().await.insert(0, actor.get_receiver_tx());
let mut receiver = actor.detach_receiver();
match receiver.next().await {
Some(ConnectionCommand::Msg(msg)) => {
self.fsm
.as_ref()
.unwrap()
.lock()
.await
.remove_actor(0)
.await;
let response: AdminResponse = msg.try_into()?;
self.close().await;
if response.result() == 0 {
return Ok(response.content_v0());
}
Err(ProtocolError::try_from(response.result()).unwrap())
}
Some(ConnectionCommand::ProtocolError(e)) => Err(e),
Some(ConnectionCommand::Error(e)) => Err(e.into()),
Some(ConnectionCommand::Close) => Err(ProtocolError::Closing),
_ => Err(ProtocolError::ActorError),
}
} else {
panic!("cannot call probe on a server-side connection");
}
}
pub async fn probe(&mut self) -> Result<Option<PubKey>, ProtocolError> { pub async fn probe(&mut self) -> Result<Option<PubKey>, ProtocolError> {
if !self.dir.is_server() { if !self.dir.is_server() {
let config = StartConfig::Probe; let config = StartConfig::Probe;
@ -984,7 +1171,7 @@ impl ConnectionBase {
self.sender = Some(sender_rx); self.sender = Some(sender_rx);
self.receiver = Some(receiver_tx.clone()); self.receiver = Some(receiver_tx.clone());
self.sender_tx = Some(sender_tx.clone()); self.sender_tx = Some(sender_tx.clone());
self.receiver_tx = Some(receiver_tx); self.receiver_tx = Some(receiver_tx.clone());
let fsm = Arc::new(Mutex::new(NoiseFSM::new( let fsm = Arc::new(Mutex::new(NoiseFSM::new(
bind_addresses, bind_addresses,
@ -998,6 +1185,7 @@ impl ConnectionBase {
self.fsm = Some(Arc::clone(&fsm)); self.fsm = Some(Arc::clone(&fsm));
spawn_and_log_error(Self::read_loop( spawn_and_log_error(Self::read_loop(
receiver_tx,
receiver_rx, receiver_rx,
sender_tx, sender_tx,
Arc::clone(&self.actors), Arc::clone(&self.actors),
@ -1019,14 +1207,14 @@ mod test {
#[async_std::test] #[async_std::test]
pub async fn test_typeid() { pub async fn test_typeid() {
log_info!( log_debug!(
"{:?}", "{:?}",
ClientHello::Noise3(Noise::V0(NoiseV0 { data: vec![] })).type_id() ClientHello::Noise3(Noise::V0(NoiseV0 { data: vec![] })).type_id()
); );
let a = Noise::V0(NoiseV0 { data: [].to_vec() }); let a = Noise::V0(NoiseV0 { data: [].to_vec() });
log_info!("{:?}", a.type_id()); log_debug!("{:?}", a.type_id());
log_info!("{:?}", TypeId::of::<Noise>()); log_debug!("{:?}", TypeId::of::<Noise>());
log_info!("{:?}", ClientHello::Local.type_id()); log_debug!("{:?}", ClientHello::Local.type_id());
log_info!("{:?}", TypeId::of::<ClientHello>()); log_debug!("{:?}", TypeId::of::<ClientHello>());
} }
} }

@ -9,11 +9,11 @@
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
use crate::types::BrokerMessage;
use core::fmt; use core::fmt;
use num_enum::IntoPrimitive; use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive; use num_enum::TryFromPrimitive;
use p2p_repo::object::ObjectParseError; use p2p_repo::object::ObjectParseError;
use p2p_repo::store::StorageError;
use p2p_repo::types::Block; use p2p_repo::types::Block;
use p2p_repo::types::ObjectId; use p2p_repo::types::ObjectId;
use std::convert::From; use std::convert::From;
@ -62,11 +62,10 @@ pub enum ProtocolError {
OverlayNotFound, OverlayNotFound,
BrokerError, BrokerError,
NotFound, NotFound,
StoreError,
MissingBlocks, MissingBlocks,
ObjectParseError, ObjectParseError,
InvalidValue, InvalidValue,
UserAlreadyExists, AlreadyExists,
RepoIdRequired, RepoIdRequired,
ConnectionError, ConnectionError,
@ -74,6 +73,8 @@ pub enum ProtocolError {
PeerAlreadyConnected, PeerAlreadyConnected,
OtherError, OtherError,
NetError,
StorageError,
Closing, Closing,
FsmNotReady, FsmNotReady,
MustBeEncrypted, MustBeEncrypted,
@ -85,6 +86,35 @@ pub enum ProtocolError {
InvalidNonce, InvalidNonce,
} //MAX 949 ProtocolErrors } //MAX 949 ProtocolErrors
impl From<NetError> for ProtocolError {
fn from(e: NetError) -> Self {
match e {
NetError::IoError => ProtocolError::IoError,
NetError::WsError => ProtocolError::WsError,
NetError::ConnectionError => ProtocolError::ConnectionError,
NetError::SerializationError => ProtocolError::SerializationError,
NetError::ProtocolError => ProtocolError::OtherError,
NetError::AccessDenied => ProtocolError::AccessDenied,
NetError::PeerAlreadyConnected => ProtocolError::PeerAlreadyConnected,
NetError::Closing => ProtocolError::Closing,
_ => ProtocolError::NetError,
}
}
}
impl From<StorageError> for ProtocolError {
fn from(e: StorageError) -> Self {
match e {
StorageError::NotFound => ProtocolError::NotFound,
StorageError::InvalidValue => ProtocolError::InvalidValue,
StorageError::BackendError => ProtocolError::StorageError,
StorageError::SerializationError => ProtocolError::SerializationError,
StorageError::AlreadyExists => ProtocolError::AlreadyExists,
_ => ProtocolError::StorageError,
}
}
}
impl ProtocolError { impl ProtocolError {
pub fn is_stream(&self) -> bool { pub fn is_stream(&self) -> bool {
*self == ProtocolError::PartialContent || *self == ProtocolError::EndOfStream *self == ProtocolError::PartialContent || *self == ProtocolError::EndOfStream
@ -118,16 +148,6 @@ impl From<ObjectParseError> for ProtocolError {
} }
} }
impl From<p2p_repo::store::StorageError> for ProtocolError {
fn from(e: p2p_repo::store::StorageError) -> Self {
match e {
p2p_repo::store::StorageError::NotFound => ProtocolError::NotFound,
p2p_repo::store::StorageError::InvalidValue => ProtocolError::InvalidValue,
_ => ProtocolError::StoreError,
}
}
}
impl From<serde_bare::error::Error> for ProtocolError { impl From<serde_bare::error::Error> for ProtocolError {
fn from(e: serde_bare::error::Error) -> Self { fn from(e: serde_bare::error::Error) -> Self {
ProtocolError::SerializationError ProtocolError::SerializationError
@ -140,72 +160,72 @@ impl From<serde_bare::error::Error> for NetError {
} }
} }
impl From<BrokerMessage> for Result<(), ProtocolError> { // impl From<BrokerMessage> for Result<(), ProtocolError> {
fn from(msg: BrokerMessage) -> Self { // fn from(msg: BrokerMessage) -> Self {
if !msg.is_response() { // if !msg.is_response() {
panic!("BrokerMessage is not a response"); // panic!("BrokerMessage is not a response");
} // }
match msg.result() { // match msg.result() {
0 => Ok(()), // 0 => Ok(()),
err => Err(ProtocolError::try_from(err).unwrap()), // err => Err(ProtocolError::try_from(err).unwrap()),
} // }
} // }
} // }
impl From<BrokerMessage> for Result<ObjectId, ProtocolError> { // impl From<BrokerMessage> for Result<ObjectId, ProtocolError> {
fn from(msg: BrokerMessage) -> Self { // fn from(msg: BrokerMessage) -> Self {
if !msg.is_response() { // if !msg.is_response() {
panic!("BrokerMessage is not a response"); // panic!("BrokerMessage is not a response");
} // }
match msg.result() { // match msg.result() {
0 => Ok(msg.response_object_id()), // 0 => Ok(msg.response_object_id()),
err => Err(ProtocolError::try_from(err).unwrap()), // err => Err(ProtocolError::try_from(err).unwrap()),
} // }
} // }
} // }
/// Option represents if a Block is available. cannot be returned here. call BrokerMessage.response_block() to get a reference to it. // /// Option represents if a Block is available. cannot be returned here. call BrokerMessage.response_block() to get a reference to it.
impl From<BrokerMessage> for Result<Option<u16>, ProtocolError> { // impl From<BrokerMessage> for Result<Option<u16>, ProtocolError> {
fn from(msg: BrokerMessage) -> Self { // fn from(msg: BrokerMessage) -> Self {
if !msg.is_response() { // if !msg.is_response() {
panic!("BrokerMessage is not a response"); // panic!("BrokerMessage is not a response");
} // }
//let partial: u16 = ProtocolError::PartialContent.into(); // //let partial: u16 = ProtocolError::PartialContent.into();
let res = msg.result(); // let res = msg.result();
if res == 0 || ProtocolError::try_from(res).unwrap().is_stream() { // if res == 0 || ProtocolError::try_from(res).unwrap().is_stream() {
if msg.is_overlay() { // if msg.is_overlay() {
match msg.response_block() { // match msg.response_block() {
Some(_) => Ok(Some(res)), // Some(_) => Ok(Some(res)),
None => Ok(None), // None => Ok(None),
} // }
} else { // } else {
Ok(None) // Ok(None)
} // }
} else { // } else {
Err(ProtocolError::try_from(res).unwrap()) // Err(ProtocolError::try_from(res).unwrap())
} // }
} // }
} // }
/// Option represents if a Block is available. returns a clone. // /// Option represents if a Block is available. returns a clone.
impl From<BrokerMessage> for Result<Option<Block>, ProtocolError> { // impl From<BrokerMessage> for Result<Option<Block>, ProtocolError> {
fn from(msg: BrokerMessage) -> Self { // fn from(msg: BrokerMessage) -> Self {
if !msg.is_response() { // if !msg.is_response() {
panic!("BrokerMessage is not a response"); // panic!("BrokerMessage is not a response");
} // }
//let partial: u16 = ProtocolError::PartialContent.into(); // //let partial: u16 = ProtocolError::PartialContent.into();
let res = msg.result(); // let res = msg.result();
if res == 0 || ProtocolError::try_from(res).unwrap().is_stream() { // if res == 0 || ProtocolError::try_from(res).unwrap().is_stream() {
if msg.is_overlay() { // if msg.is_overlay() {
match msg.response_block() { // match msg.response_block() {
Some(b) => Ok(Some(b.clone())), // Some(b) => Ok(Some(b.clone())),
None => Ok(None), // None => Ok(None),
} // }
} else { // } else {
Ok(None) // Ok(None)
} // }
} else { // } else {
Err(ProtocolError::try_from(res).unwrap()) // Err(ProtocolError::try_from(res).unwrap())
} // }
} // }
} // }

File diff suppressed because it is too large Load Diff

@ -68,6 +68,13 @@ pub trait ReadTransaction {
suffix: Option<u8>, suffix: Option<u8>,
value: &Vec<u8>, value: &Vec<u8>,
) -> Result<(), StorageError>; ) -> Result<(), StorageError>;
fn get_all_keys_and_values(
&self,
prefix: u8,
key_size: usize,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError>;
} }
pub trait KCVStore: ReadTransaction { pub trait KCVStore: ReadTransaction {

@ -120,6 +120,9 @@ impl PubKey {
_ => panic!("can only convert an edward key to montgomery"), _ => panic!("can only convert an edward key to montgomery"),
} }
} }
pub fn nil() -> Self {
PubKey::Ed25519PubKey([0u8; 32])
}
} }
impl fmt::Display for PubKey { impl fmt::Display for PubKey {
@ -190,12 +193,27 @@ impl TryFrom<&[u8]> for PrivKey {
} }
} }
impl TryFrom<&str> for PrivKey {
type Error = NgError;
fn try_from(str: &str) -> Result<Self, NgError> {
let key = decode_key(str).map_err(|_| NgError::InvalidKey)?;
Ok(PrivKey::Ed25519PrivKey(key))
}
}
impl fmt::Display for PrivKey { impl fmt::Display for PrivKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let priv_key_ser = serde_bare::to_vec(self).unwrap(); match self {
let prix_key_encoded = base64_url::encode(&priv_key_ser); Self::Ed25519PrivKey(ed) => {
//let priv_key_ser = serde_bare::to_vec(ed).unwrap();
let prix_key_encoded = base64_url::encode(ed);
write!(f, "{}", prix_key_encoded) write!(f, "{}", prix_key_encoded)
} }
_ => {
unimplemented!();
}
}
}
} }
/// Ed25519 signature /// Ed25519 signature

@ -16,5 +16,5 @@ hex = "0.4.3"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.rkv] [target.'cfg(not(target_arch = "wasm32"))'.dependencies.rkv]
git = "https://git.nextgraph.org/NextGraph/rkv.git" git = "https://git.nextgraph.org/NextGraph/rkv.git"
rev = "8f5ad79c0c93138b1bdc0a1254a7c6b4d357a5d9" rev = "c746abb443b7bb4541ebbef2b71e8d0f9eb39f6a"
features = [ "lmdb" ] features = [ "lmdb" ]

@ -41,6 +41,14 @@ impl<'a> LmdbTransaction<'a> {
} }
impl<'a> ReadTransaction for LmdbTransaction<'a> { impl<'a> ReadTransaction for LmdbTransaction<'a> {
fn get_all_keys_and_values(
&self,
prefix: u8,
key_size: usize,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
unimplemented!();
}
/// Load a single value property from the store. /// Load a single value property from the store.
fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError> { fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError> {
let property = LmdbKCVStore::compute_property(prefix, key, suffix); let property = LmdbKCVStore::compute_property(prefix, key, suffix);
@ -219,7 +227,66 @@ pub struct LmdbKCVStore {
path: String, path: String,
} }
fn compare<T: Ord>(a: &[T], b: &[T]) -> std::cmp::Ordering {
let mut iter_b = b.iter();
for v in a {
match iter_b.next() {
Some(w) => match v.cmp(w) {
std::cmp::Ordering::Equal => continue,
ord => return ord,
},
None => break,
}
}
return a.len().cmp(&b.len());
}
impl ReadTransaction for LmdbKCVStore { impl ReadTransaction for LmdbKCVStore {
fn get_all_keys_and_values(
&self,
prefix: u8,
key_size: usize,
suffix: Option<u8>,
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, StorageError> {
let vec_key_start = vec![0u8; key_size];
let vec_key_end = vec![255u8; key_size];
let property_start = Self::compute_property(prefix, &vec_key_start, suffix);
let property_end = Self::compute_property(prefix, &vec_key_end, suffix);
let lock = self.environment.read().unwrap();
let reader = lock.read().unwrap();
let mut iter = self
.main_store
.iter_from(&reader, property_start)
.map_err(|e| StorageError::BackendError)?;
let mut vector: Vec<(Vec<u8>, Vec<u8>)> = vec![];
while let res = iter.next() {
match res {
Some(Ok(val)) => {
match compare(val.0, property_end.as_slice()) {
std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {
if suffix.is_some() {
if val.0.len() < (key_size + 2)
|| val.0[1 + key_size] != suffix.unwrap()
{
continue;
}
} else if val.0.len() > (key_size + 1) {
continue;
}
vector.push((val.0.to_vec(), val.1.to_bytes().unwrap()));
}
_ => {} //,
}
}
Some(Err(_e)) => return Err(StorageError::BackendError),
None => {
break;
}
}
}
Ok(vector)
}
/// Load a single value property from the store. /// Load a single value property from the store.
fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError> { fn get(&self, prefix: u8, key: &Vec<u8>, suffix: Option<u8>) -> Result<Vec<u8>, StorageError> {
let property = Self::compute_property(prefix, key, suffix); let property = Self::compute_property(prefix, key, suffix);

Loading…
Cancel
Save