moved disconnection handling to the LocalBroker

pull/19/head
Niko PLP 8 months ago
parent c8b7a04ab4
commit 1dbd931b0b
  1. 1
      Cargo.lock
  2. 1
      nextgraph/Cargo.toml
  3. 46
      nextgraph/src/local_broker.rs
  4. 8
      ng-app/src-tauri/src/lib.rs
  5. 2
      ng-app/src/lib/FullLayout.svelte
  6. 35
      ng-app/src/lib/Test.svelte
  7. 48
      ng-app/src/store.ts
  8. 4
      ng-client-ws/src/remote_ws_wasm.rs
  9. 136
      ng-net/src/broker.rs
  10. 31
      ng-sdk-js/src/lib.rs
  11. 4
      ng-verifier/src/request_processor.rs
  12. 6
      ng-verifier/src/types.rs
  13. 130
      ng-verifier/src/verifier.rs

1
Cargo.lock generated

@ -3228,6 +3228,7 @@ dependencies = [
"async-std",
"async-trait",
"base64-url",
"futures",
"ng-client-ws",
"ng-net",
"ng-repo",

@ -32,6 +32,7 @@ async-std = { version = "1.12.0", features = [ "attributes", "unstable" ] }
zeroize = { version = "1.6.0", features = ["zeroize_derive"] }
serde_json = "1.0"
async-trait = "0.1.64"
futures = "0.3.24"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" }

@ -10,6 +10,8 @@
use async_once_cell::OnceCell;
use async_std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use core::fmt;
use futures::channel::mpsc;
use futures::SinkExt;
use ng_net::actor::EActor;
use ng_net::connection::{ClientConfig, IConnect, NoiseFSM, StartConfig};
use ng_net::types::{ClientInfo, ClientType, ProtocolMessage};
@ -386,6 +388,9 @@ struct LocalBroker {
pub opened_sessions_list: Vec<Option<Session>>,
tauri_streams: HashMap<String, CancelFn>,
disconnections_sender: Sender<String>,
disconnections_receiver: Option<Receiver<String>>,
}
impl fmt::Debug for LocalBroker {
@ -407,6 +412,12 @@ impl ILocalBroker for LocalBroker {
session.verifier.deliver(event, overlay).await;
}
}
async fn user_disconnected(&mut self, user_id: UserId) {
if let Some(session) = self.get_mut_session_for_user(&user_id) {
session.verifier.connection_lost();
let _ = self.disconnections_sender.send(user_id.to_string()).await;
}
}
}
// this is used if an Actor does a BROKER.local_broker.respond
@ -512,7 +523,7 @@ impl LocalBroker {
let session = self.opened_sessions_list[*session as usize]
.as_mut()
.ok_or(NgError::SessionNotFound)?;
session.verifier.connected_server_id = None;
session.verifier.connection_lost();
}
None => {}
}
@ -895,7 +906,7 @@ async fn init_(config: LocalBrokerConfig) -> Result<Arc<RwLock<LocalBroker>>, Ng
}
}
};
let (disconnections_sender, disconnections_receiver) = mpsc::unbounded::<String>();
let local_broker = LocalBroker {
config,
wallets,
@ -904,6 +915,8 @@ async fn init_(config: LocalBrokerConfig) -> Result<Arc<RwLock<LocalBroker>>, Ng
opened_sessions: HashMap::new(),
opened_sessions_list: vec![],
tauri_streams: HashMap::new(),
disconnections_sender,
disconnections_receiver: Some(disconnections_receiver),
};
//log_debug!("{:?}", &local_broker);
@ -948,6 +961,7 @@ pub async fn tauri_stream_cancel(stream_id: String) -> Result<(), NgError> {
broker.tauri_stream_cancel(stream_id);
Ok(())
}
/// Initialize the configuration of your local broker
///
/// , by passing in a function (or closure) that returns a `LocalBrokerConfig`.
@ -1383,20 +1397,11 @@ pub async fn user_connect_with_device_info(
));
}
if tried.is_some() && tried.as_ref().unwrap().3.is_none() {
session.verifier.connected_server_id = Some(server_key);
// successful. we can stop here
// load verifier from remote connection (if not RocksDb type, or after import on tauri)
if let Err(e) = session.verifier.bootstrap().await {
session.verifier.connected_server_id = None;
if let Err(e) =
session.verifier.connection_opened(server_key).await
{
Broker::close_all_connections().await;
tried.as_mut().unwrap().3 = Some(e.to_string());
} else {
// we can send outbox now that the verifier is loaded
let res = session.verifier.send_outbox().await;
log_info!("SENDING EVENTS FROM OUTBOX RETURNED: {:?}", res);
//log_info!("VERIFIER DUMP {:?}", session.verifier);
}
break;
@ -1591,6 +1596,19 @@ pub async fn personal_site_store(session_id: u64, store: SiteStoreType) -> Resul
}
}
#[doc(hidden)]
pub async fn take_disconnections_receiver() -> Result<Receiver<String>, NgError> {
let mut broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized),
Some(Ok(broker)) => broker.write().await,
};
broker
.disconnections_receiver
.take()
.ok_or(NgError::BrokerError)
}
#[cfg(test)]
mod test {
use super::*;

@ -335,14 +335,12 @@ async fn cancel_stream(stream_id: &str) -> Result<(), String> {
}
#[tauri::command(rename_all = "snake_case")]
async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), ()> {
async fn disconnections_subscribe(app: tauri::AppHandle) -> Result<(), String> {
let main_window = app.get_window("main").unwrap();
let reader = BROKER
.write()
let reader = nextgraph::local_broker::take_disconnections_receiver()
.await
.take_disconnections_receiver()
.ok_or(())?;
.map_err(|e: NgError| e.to_string())?;
async fn inner_task(
mut reader: Receiver<String>,

@ -62,6 +62,7 @@
"flex items-center p-2 text-base font-normal text-gray-900 rounded-lg dark:text-white hover:bg-gray-200 dark:hover:bg-gray-700";
</script>
<svelte:window bind:innerWidth={width} />
{#if mobile}
<div class="full-layout">
<main class="pb-14" bind:this={top}>
@ -185,7 +186,6 @@
</main>
</div>
{/if}
<svelte:window bind:innerWidth={width} />
<style>
.full-layout {

@ -11,13 +11,24 @@
<script lang="ts">
import ng from "../api";
import { branch_subs, active_session } from "../store";
import {
branch_subs,
active_session,
cannot_load_offline,
online,
} from "../store";
import { link } from "svelte-spa-router";
import { onMount, onDestroy } from "svelte";
import { Button } from "flowbite-svelte";
let is_tauri = import.meta.env.TAURI_PLATFORM;
let files = branch_subs("ok");
let img_map = {};
onMount(() => {});
async function get_img(ref) {
if (!ref) return false;
let cache = img_map[ref.nuri];
@ -189,11 +200,28 @@
</script>
<div>
{#if $cannot_load_offline}
<div class="row p-4">
<p>
You are offline and using the web app. You need to connect to the broker
at least once before you can start using the app locally because the web
app does not keep a local copy of your documents.<br /><br />
Once connected, if you lose connectivity again, you will be able to have
limited access to some functionalities. Sending binary files won't be possible,
because the limit of local storage in your browser is around 5MB.<br
/><br />
All those limitations will be lifted once the "UserStorage for Web" feature
will be released. Stay tuned! <br /><br />
Check your connection status in the <a href="#/user">user panel</a>.
</p>
</div>
{:else}
<div class="row mt-2">
<!-- <a use:link href="/">
<button tabindex="-1" class=" mr-5 select-none"> Back home </button>
</a> -->
<button
<Button
disabled={!$online && !is_tauri}
type="button"
on:click={() => {
fileinput.click();
@ -216,7 +244,7 @@
/>
</svg>
Add image
</button>
</Button>
<input
style="display:none"
type="file"
@ -241,4 +269,5 @@
</p>
{/each}
{/await}
{/if}
</div>

@ -35,15 +35,28 @@ export const online = derived(connections,($connections) => {
});
return true; }
else if ($connections[cnx].error=="ConnectionError" && !$connections[cnx].connecting && next_reconnect==null) {
console.log("will try reconnect in 1 min");
console.log("will try reconnect in 20 sec");
next_reconnect = setTimeout(async ()=> {
await reconnect();
},60000);
},20000);
}
}
return false;
});
export const cannot_load_offline = writable(false);
if (!get(online) && !import.meta.env.TAURI_PLATFORM) {
cannot_load_offline.set(true);
let unsubscribe = online.subscribe(async (value) => {
if (value) {
cannot_load_offline.set(false);
unsubscribe();
}
});
}
export const has_wallets = derived(wallets,($wallets) => Object.keys($wallets).length);
@ -105,6 +118,8 @@ export const reconnect = async function() {
get(active_session).user,
location.href
));
}catch (e) {
console.error(e)
}
@ -159,13 +174,15 @@ export const branch_subs = function(nura) {
return {
load: async () => {
console.log("load upper");
let already_subscribed = all_branches[nura];
if (!already_subscribed) return;
if (already_subscribed.load) {
console.log("doing the load");
let loader = already_subscribed.load;
already_subscribed.load = undefined;
// already_subscribed.load2 = loader;
await loader();
}
},
subscribe: (run, invalid) => {
@ -178,15 +195,18 @@ export const branch_subs = function(nura) {
already_subscribed = {
load: async () => {
try {
console.log("load down");
let session = get(active_session);
if (!session) {
console.error("no session");
return;
}
await unsub();
unsub();
unsub = () => {};
set([]);
unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_private_subscribe(),
async (commit) => {
//console.log("GOT APP RESPONSE", commit);
console.log("GOT APP RESPONSE", commit);
update( (old) => {old.unshift(commit); return old;} )
});
}
@ -198,14 +218,17 @@ export const branch_subs = function(nura) {
},
increase: () => {
count += 1;
console.log("increase sub to",count);
return readonly({subscribe});
},
decrease: async () => {
decrease: () => {
count -= 1;
if (count == 0) {
await unsub();
delete all_branches[nura];
}
console.log("decrease sub to",count);
// if (count == 0) {
// unsub();
// console.log("removed sub");
// delete all_branches[nura];
// }
},
}
all_branches[nura] = already_subscribed;
@ -213,9 +236,10 @@ export const branch_subs = function(nura) {
let new_store = already_subscribed.increase();
let read_unsub = new_store.subscribe(run, invalid);
return async () => {
return () => {
read_unsub();
await already_subscribed.decrease();
console.log("callback unsub");
already_subscribed.decrease();
}
}

@ -107,7 +107,7 @@ async fn ws_loop(
select! {
r = stream.next().fuse() => match r {
Some(msg) => {
log_debug!("GOT MESSAGE {:?}", msg);
//log_debug!("GOT MESSAGE {:?}", msg);
if let WsMessage::Binary(b) = msg {
receiver.send(ConnectionCommand::Msg(serde_bare::from_slice::<ProtocolMessage>(&b)?)).await
.map_err(|_e| NetError::IoError)?;
@ -120,7 +120,7 @@ async fn ws_loop(
},
s = sender.next().fuse() => match s {
Some(msg) => {
log_debug!("SENDING MESSAGE {:?}", msg);
//log_debug!("SENDING MESSAGE {:?}", msg);
match msg {
ConnectionCommand::Msg(m) => {

@ -68,11 +68,13 @@ pub struct ServerConfig {
#[async_trait::async_trait]
pub trait ILocalBroker: Send + Sync + EActor {
async fn deliver(&mut self, event: Event, overlay: OverlayId, user: UserId);
async fn user_disconnected(&mut self, user_id: UserId);
}
pub static BROKER: Lazy<Arc<RwLock<Broker>>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new())));
pub struct Broker<'a> {
pub struct Broker {
direct_connections: HashMap<BindAddress, DirectConnection>,
/// tuple of optional userId and peer key in montgomery form. userId is always None on the server side.
peers: HashMap<(Option<PubKey>, X25519PubKey), BrokerPeerInfo>,
@ -85,17 +87,15 @@ pub struct Broker<'a> {
shutdown: Option<Receiver<ProtocolError>>,
shutdown_sender: Sender<ProtocolError>,
closing: bool,
server_broker: Option<Box<dyn IServerBroker + Send + Sync + 'a>>,
server_broker: Option<Box<dyn IServerBroker + Send + Sync>>,
disconnections_sender: Sender<String>,
disconnections_receiver: Option<Receiver<String>>,
//local_broker: Option<Box<dyn ILocalBroker + Send + Sync + 'a>>,
local_broker: Option<Arc<RwLock<dyn ILocalBroker + 'a>>>,
local_broker: Option<Arc<RwLock<dyn ILocalBroker>>>,
users_peers: HashMap<UserId, HashSet<X25519PubKey>>,
}
impl<'a> Broker<'a> {
impl Broker {
// pub fn init_local_broker(
// &mut self,
// base_path: Option<PathBuf>,
@ -132,12 +132,12 @@ impl<'a> Broker<'a> {
.ok_or(ProtocolError::BrokerError)
}
pub fn set_server_broker(&mut self, broker: impl IServerBroker + 'a) {
pub fn set_server_broker(&mut self, broker: impl IServerBroker + 'static) {
//log_debug!("set_server_broker");
self.server_broker = Some(Box::new(broker));
}
pub fn set_local_broker(&mut self, broker: Arc<RwLock<dyn ILocalBroker + 'a>>) {
pub fn set_local_broker(&mut self, broker: Arc<RwLock<dyn ILocalBroker>>) {
//log_debug!("set_local_broker");
self.local_broker = Some(broker);
}
@ -166,7 +166,7 @@ impl<'a> Broker<'a> {
pub fn get_server_broker(
&self,
) -> Result<&Box<dyn IServerBroker + Send + Sync + 'a>, ProtocolError> {
) -> Result<&Box<dyn IServerBroker + Send + Sync>, ProtocolError> {
//log_debug!("GET STORAGE {:?}", self.server_storage);
self.server_broker
.as_ref()
@ -175,7 +175,7 @@ impl<'a> Broker<'a> {
pub fn get_server_broker_mut(
&mut self,
) -> Result<&mut Box<dyn IServerBroker + Send + Sync + 'a>, ProtocolError> {
) -> Result<&mut Box<dyn IServerBroker + Send + Sync>, ProtocolError> {
//log_debug!("GET STORAGE {:?}", self.server_storage);
self.server_broker
.as_mut()
@ -183,9 +183,11 @@ impl<'a> Broker<'a> {
}
//Option<Arc<RwLock<dyn ILocalBroker>>>,
pub fn get_local_broker(&self) -> Result<Arc<RwLock<dyn ILocalBroker + 'a>>, NgError> {
pub(crate) fn get_local_broker(&self) -> Result<Arc<RwLock<dyn ILocalBroker>>, ProtocolError> {
Ok(Arc::clone(
self.local_broker.as_ref().ok_or(NgError::BrokerError)?,
self.local_broker
.as_ref()
.ok_or(ProtocolError::BrokerError)?,
))
}
@ -297,102 +299,6 @@ impl<'a> Broker<'a> {
}
}
pub async fn get_block_from_store_with_block_id(
&mut self,
nuri: String,
id: BlockId,
include_children: bool,
) -> Result<Receiver<Block>, ProtocolError> {
// TODO
let (mut tx, rx) = mpsc::unbounded::<Block>();
//log_debug!("cur {}", std::env::current_dir().unwrap().display());
//Err(ProtocolError::AccessDenied)
// let f = std::fs::File::open(
// "../ng-repo/tests/e4e4b57524ce29df826055c368894e912ab03af46f61f6270b4c8796bc6f4221.ng",
// )
// .expect("open of block.ng");
// let mut reader = BufReader::new(f);
// let mut block_buffer: Vec<u8> = Vec::new();
// reader
// .read_to_end(&mut block_buffer)
// .expect("read of test.ng");
let block = serde_bare::from_slice::<Block>(&crate::tests::file::TEST).unwrap();
tx.send(block).await;
Ok(rx)
}
pub async fn get_object_from_store_with_object_ref(
&mut self,
nuri: String,
obj_ref: ObjectRef,
) -> Result<ObjectContent, ProtocolError> {
unimplemented!();
// let blockstream = self
// .get_block_from_store_with_block_id(nuri, obj_ref.id, true)
// .await?;
// let store = Box::new(HashMapBlockStorage::from_block_stream(blockstream).await);
// Object::load(obj_ref.id, Some(obj_ref.key), &store)
// .map_err(|e| match e {
// ObjectParseError::MissingBlocks(_missing) => ProtocolError::MissingBlocks,
// _ => ProtocolError::ObjectParseError,
// })?
// .content()
// .map_err(|_| ProtocolError::ObjectParseError)
}
// pub async fn doc_sync_branch(&mut self, anuri: String) -> (Receiver<Commit>, Sender<Commit>) {
// let obj_ref = ObjectRef {
// id: ObjectId::Blake3Digest32([
// 228, 228, 181, 117, 36, 206, 41, 223, 130, 96, 85, 195, 104, 137, 78, 145, 42, 176,
// 58, 244, 111, 97, 246, 39, 11, 76, 135, 150, 188, 111, 66, 33,
// ]),
// key: SymKey::ChaCha20Key([
// 100, 243, 39, 242, 203, 131, 102, 50, 9, 54, 248, 113, 4, 160, 28, 45, 73, 56, 217,
// 112, 95, 150, 144, 137, 9, 57, 106, 5, 39, 202, 146, 94,
// ]),
// };
// let refs = vec![obj_ref.clone()];
// let metadata = vec![5u8; 55];
// let (member_privkey, member_pubkey) = generate_keypair();
// let overlay = OverlayId::nil();
// let commit = Commit::new(
// &member_privkey,
// &member_pubkey,
// overlay,
// PubKey::nil(),
// QuorumType::NoSigning,
// vec![],
// vec![],
// vec![],
// vec![],
// refs,
// vec![],
// metadata,
// obj_ref.clone(),
// )
// .unwrap();
// let (tx, rx) = mpsc::unbounded::<Commit>();
// async fn send(mut tx: Sender<Commit>, commit: Commit) -> ResultSend<()> {
// while let Ok(_) = tx.send(commit.clone()).await {
// log_debug!("sending");
// sleep!(std::time::Duration::from_secs(3));
// }
// log_debug!("end of sending");
// Ok(())
// }
// spawn_and_log_error(send(tx.clone(), commit));
// (rx, tx.clone())
// }
pub fn reconnecting(&mut self, peer_id: X25519PrivKey, user: Option<PubKey>) {
let peerinfo = self.peers.get_mut(&(user, peer_id));
match peerinfo {
@ -466,7 +372,7 @@ impl<'a> Broker<'a> {
let (shutdown_sender, shutdown_receiver) = mpsc::unbounded::<ProtocolError>();
let mut random_buf = [0u8; 4];
getrandom::getrandom(&mut random_buf).unwrap();
let (disconnections_sender, disconnections_receiver) = mpsc::unbounded::<String>();
Broker {
anonymous_connections: HashMap::new(),
#[cfg(not(target_arch = "wasm32"))]
@ -479,8 +385,6 @@ impl<'a> Broker<'a> {
peers: HashMap::new(),
closing: false,
server_broker: None,
disconnections_sender,
disconnections_receiver: Some(disconnections_receiver),
local_broker: None,
users_peers: HashMap::new(),
}
@ -892,7 +796,7 @@ impl<'a> Broker<'a> {
peer_pubkey: PubKey,
remote_peer_id: [u8; 32],
config: StartConfig,
mut disconnections_sender: Sender<String>,
local_broker: Arc<async_std::sync::RwLock<dyn ILocalBroker>>,
) -> ResultSend<()> {
async move {
let res = join.next().await;
@ -917,7 +821,7 @@ impl<'a> Broker<'a> {
// if all attempts fail :
if let Some(user) = config.get_user() {
disconnections_sender.send(user.to_string()).await;
local_broker.write().await.user_disconnected(user).await;
}
} else {
log_info!("REMOVED");
@ -938,7 +842,7 @@ impl<'a> Broker<'a> {
peer_pubk,
*remote_peer_id_dh.slice(),
config,
self.disconnections_sender.clone(),
self.get_local_broker()?,
));
Ok(())
}
@ -1007,10 +911,6 @@ impl<'a> Broker<'a> {
Ok(())
}
pub fn take_disconnections_receiver(&mut self) -> Option<Receiver<String>> {
self.disconnections_receiver.take()
}
async fn close_peer_connection_x(&mut self, peer_id: X25519PubKey, user: Option<PubKey>) {
if let Some(peer) = self.peers.get_mut(&(user, peer_id)) {
match &mut peer.connected {

@ -340,7 +340,6 @@ pub async fn wallet_read_file(js_file: JsValue) -> Result<JsValue, String> {
#[wasm_bindgen]
pub async fn wallet_was_opened(
js_opened_wallet: JsValue, //SensitiveWallet
in_memory: bool,
) -> Result<JsValue, String> {
let mut opened_wallet = serde_wasm_bindgen::from_value::<SensitiveWallet>(js_opened_wallet)
.map_err(|_| "Deserialization error of SensitiveWallet".to_string())?;
@ -523,7 +522,8 @@ pub async fn app_request_stream(
//let xx = JsValue::from(json!(commit).to_string());
//let _ = callback.call1(&this, &xx);
let this = JsValue::null();
let jsval: JsValue = callback.call1(&this, &xx).unwrap();
match callback.call1(&this, &xx) {
Ok(jsval) => {
let promise_res: Result<js_sys::Promise, JsValue> = jsval.dyn_into();
match promise_res {
Ok(promise) => {
@ -532,7 +532,12 @@ pub async fn app_request_stream(
Err(_) => {}
}
}
log_debug!("END OF LOOP");
Err(e) => {
log_err!("JS callback for app_request_stream failed with {:?}", e);
}
}
}
log_info!("END OF LOOP");
Ok(())
}
@ -619,19 +624,16 @@ pub async fn doc_fetch_private_subscribe() -> Result<JsValue, String> {
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn disconnections_subscribe(callback: &js_sys::Function) -> Result<JsValue, JsValue> {
init_local_broker_with_lazy(&INIT_LOCAL_BROKER).await;
let vec: Vec<u8> = vec![2; 10];
let view = unsafe { Uint8Array::view(&vec) };
let x = JsValue::from(Uint8Array::new(view.as_ref()));
let mut reader;
{
reader = BROKER
.write()
reader = nextgraph::local_broker::take_disconnections_receiver()
.await
.take_disconnections_receiver()
.ok_or(false)?;
.map_err(|e: NgError| false)?;
}
async fn inner_task(
mut reader: Receiver<String>,
callback: js_sys::Function,
@ -639,7 +641,8 @@ pub async fn disconnections_subscribe(callback: &js_sys::Function) -> Result<JsV
while let Some(user_id) = reader.next().await {
let this = JsValue::null();
let xx = serde_wasm_bindgen::to_value(&user_id).unwrap();
let jsval: JsValue = callback.call1(&this, &xx).unwrap();
match callback.call1(&this, &xx) {
Ok(jsval) => {
let promise_res: Result<js_sys::Promise, JsValue> = jsval.dyn_into();
match promise_res {
Ok(promise) => {
@ -648,6 +651,14 @@ pub async fn disconnections_subscribe(callback: &js_sys::Function) -> Result<JsV
Err(_) => {}
}
}
Err(e) => {
log_err!(
"JS callback for disconnections_subscribe failed with {:?}",
e
);
}
}
}
log_debug!("END OF disconnections reader");
Ok(())
}

@ -41,7 +41,9 @@ impl AppRequestCommandV0 {
AppFetchContentV0::Subscribe => {
let (_, branch_id, _) =
Self::open_for_target(verifier, &nuri.target, false).await?;
Ok(verifier.create_branch_subscription(branch_id).await?)
Ok(verifier
.create_branch_subscription(branch_id, false)
.await?)
}
_ => unimplemented!(),
},

@ -172,10 +172,10 @@ pub enum AppFetchContentV0 {
impl AppFetchContentV0 {
pub fn get_or_subscribe(subscribe: bool) -> Self {
if subscribe {
AppFetchContentV0::Subscribe
} else {
if !subscribe {
AppFetchContentV0::Get
} else {
AppFetchContentV0::Subscribe
}
}
}

@ -154,19 +154,25 @@ impl Verifier {
}
pub(crate) async fn push_app_response(&mut self, branch: &BranchId, response: AppResponse) {
// log_info!(
// "push_app_response {} {:?}",
// branch,
// self.branch_subscriptions
// );
log_info!(
"push_app_response {} {:?}",
branch,
self.branch_subscriptions
);
if let Some(sender) = self.branch_subscriptions.get_mut(branch) {
if sender.is_closed() {
log_info!("closed so removed");
self.branch_subscriptions.remove(branch);
} else {
let _ = sender.send(response).await;
}
}
}
pub(crate) async fn create_branch_subscription(
&mut self,
branch: BranchId,
resub: bool,
) -> Result<(Receiver<AppResponse>, CancelFn), VerifierError> {
// async fn send(mut tx: Sender<AppResponse>, msg: AppResponse) -> ResultSend<()> {
// while let Ok(_) = tx.send(msg.clone()).await {
@ -179,11 +185,16 @@ impl Verifier {
// spawn_and_log_error(send(tx.clone(), commit));
//log_info!("#### create_branch_subscription {}", branch);
let (tx, rx) = mpsc::unbounded::<AppResponse>();
log_info!("SUBSCRIBE");
if let Some(returned) = self.branch_subscriptions.insert(branch, tx.clone()) {
log_info!("RESUBSCRIBE");
if !returned.is_closed() {
return Err(VerifierError::DoubleBranchSubscription);
log_info!("FORCE CLOSE");
returned.close_channel();
//return Err(VerifierError::DoubleBranchSubscription);
}
}
if !resub {
//let tx = self.branch_subscriptions.entry(branch).or_insert_with(|| {});
for file in self
.user_storage
@ -194,9 +205,13 @@ impl Verifier {
self.push_app_response(&branch, AppResponse::V0(AppResponseV0::File(file)))
.await;
}
}
let fnonce = Box::new(move || {
log_info!("CLOSE_CHANNEL");
if !tx.is_closed() {
tx.close_channel();
}
});
Ok((rx, fnonce))
}
@ -729,18 +744,59 @@ impl Verifier {
Ok(())
}
pub(crate) async fn open_branch<'a>(
pub fn connection_lost(&mut self) {
self.connected_server_id = None;
// for (_, repo) in self.repos.iter_mut() {
// repo.opened_branches = HashMap::new();
// }
}
pub async fn connection_opened(&mut self, peer: DirectPeerId) -> Result<(), NgError> {
self.connected_server_id = Some(peer);
if let Err(e) = self.bootstrap().await {
self.connected_server_id = None;
return Err(e);
}
let res = self.send_outbox().await;
log_info!("SENDING EVENTS FROM OUTBOX RETURNED: {:?}", res);
let mut branches = vec![];
{
for (id, repo) in self.repos.iter() {
for (branch, publisher) in repo.opened_branches.iter() {
branches.push((*id, *branch, *publisher));
}
}
}
let user = self.config.user_priv_key.to_pub();
let broker = BROKER.read().await;
for (repo, branch, publisher) in branches {
let _ = self
.open_branch_(&repo, &branch, publisher, &broker, &user, &peer, true)
.await;
// discarding error.
}
Ok(())
}
pub(crate) async fn open_branch(
&mut self,
repo_id: &RepoId,
branch: &BranchId,
as_publisher: bool,
) -> Result<(), NgError> {
let remote = match self.connected_server_id.as_ref() {
Some(r) => r.clone(),
None => {
let repo = self.repos.get_mut(repo_id).ok_or(NgError::RepoNotFound)?;
repo.opened_branches.insert(*branch, as_publisher);
return Ok(());
}
};
let user = self.config.user_priv_key.to_pub();
let remote = self
.connected_server_id
.as_ref()
.ok_or(NgError::NotConnected)?
.clone();
self.open_branch_(
repo_id,
branch,
@ -748,6 +804,7 @@ impl Verifier {
&BROKER.read().await,
&user,
&remote,
false,
)
.await
}
@ -757,7 +814,10 @@ impl Verifier {
let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.to_owned().unwrap();
let remote = self
.connected_server_id
.to_owned()
.ok_or(NgError::NotConnected)?;
let msg = BlocksPut::V0(BlocksPutV0 {
blocks,
@ -776,7 +836,10 @@ impl Verifier {
let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.to_owned().unwrap();
let remote = self
.connected_server_id
.to_owned()
.ok_or(NgError::NotConnected)?;
let msg = BlocksExist::V0(BlocksExistV0 {
blocks,
@ -797,17 +860,22 @@ impl Verifier {
repo_id: &RepoId,
branch: &BranchId,
as_publisher: bool,
broker: &RwLockReadGuard<'a, Broker<'a>>,
broker: &RwLockReadGuard<'static, Broker>,
user: &UserId,
remote: &DirectPeerId,
force: bool,
) -> Result<(), NgError> {
let (need_open, mut need_sub, overlay) = {
let repo = self.repos.get(repo_id).ok_or(NgError::RepoNotFound)?;
let overlay = repo.store.overlay_for_read_on_client_protocol();
if force {
(true, true, overlay)
} else {
match repo.opened_branches.get(branch) {
Some(val) => (false, as_publisher && !val, overlay),
None => (repo.opened_branches.len() == 0, true, overlay),
}
}
};
//log_info!("need_open {} need_sub {}", need_open, need_sub);
@ -922,10 +990,10 @@ impl Verifier {
Ok(())
}
async fn send_event<'a>(
async fn send_event(
&mut self,
event: Event,
broker: &RwLockReadGuard<'a, Broker<'a>>,
broker: &RwLockReadGuard<'static, Broker>,
user: &UserId,
remote: &DirectPeerId,
overlay: OverlayId,
@ -937,7 +1005,7 @@ impl Verifier {
.ok_or(NgError::TopicNotFound)?
.to_owned();
self.open_branch_(&repo_id, &branch_id, true, broker, user, remote)
self.open_branch_(&repo_id, &branch_id, true, broker, user, remote, false)
.await?;
let _ = broker
@ -1122,9 +1190,9 @@ impl Verifier {
Ok(())
}
async fn do_sync_req_if_needed<'a>(
async fn do_sync_req_if_needed(
&mut self,
broker: &RwLockReadGuard<'a, Broker<'a>>,
broker: &RwLockReadGuard<'static, Broker>,
user: &UserId,
remote: &DirectPeerId,
branch_id: &BranchId,
@ -1183,9 +1251,9 @@ impl Verifier {
Ok(())
}
async fn do_sync_req<'a>(
async fn do_sync_req(
&mut self,
broker: &RwLockReadGuard<'a, Broker<'a>>,
broker: &RwLockReadGuard<'static, Broker>,
user: &UserId,
remote: &DirectPeerId,
topic: &TopicId,
@ -1217,7 +1285,7 @@ impl Verifier {
async fn load_store_from_read_cap<'a>(
&mut self,
broker: &RwLockReadGuard<'a, Broker<'a>>,
broker: &RwLockReadGuard<'static, Broker>,
user: &UserId,
remote: &DirectPeerId,
store: Arc<Store>,
@ -1300,11 +1368,11 @@ impl Verifier {
Ok(())
}
async fn get_commit<'a>(
async fn get_commit(
commit_ref: ObjectRef,
topic_id: Option<TopicId>,
overlay: &OverlayId,
broker: &RwLockReadGuard<'a, Broker<'a>>,
broker: &RwLockReadGuard<'static, Broker>,
user: &UserId,
remote: &DirectPeerId,
) -> Result<Commit, NgError> {
@ -1347,10 +1415,13 @@ impl Verifier {
let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.to_owned().unwrap();
let remote = self.connected_server_id.to_owned();
match repo.store.has(id) {
Err(StorageError::NotFound) => {
if remote.is_none() {
return Err(NgError::NotFound);
}
let msg = BlocksGet::V0(BlocksGetV0 {
ids: vec![*id],
topic: None,
@ -1358,7 +1429,7 @@ impl Verifier {
overlay: Some(overlay),
});
match broker
.request::<BlocksGet, Block>(&user, &remote, msg)
.request::<BlocksGet, Block>(&user, remote.as_ref().unwrap(), msg)
.await
{
Ok(SoS::Stream(blockstream)) => Ok(Some(blockstream)),
@ -1375,7 +1446,10 @@ impl Verifier {
if self.need_bootstrap() {
let broker = BROKER.read().await;
let user = self.config.user_priv_key.to_pub();
let remote = self.connected_server_id.to_owned().unwrap();
let remote = self
.connected_server_id
.to_owned()
.ok_or(NgError::NotConnected)?;
let private_store_id = self.config.private_store_id.to_owned().unwrap();
let private_store = self.create_private_store_from_credentials()?;

Loading…
Cancel
Save