From 30c36dafca53a96228a002693c30420ed2e29abd Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Sat, 11 May 2024 02:08:47 +0300 Subject: [PATCH] bloomfilters for branch sync_req + improved disconnection and subscription handling --- Cargo.lock | 1 + nextgraph/src/local_broker.rs | 61 ++++++++--- ng-app/src-tauri/src/lib.rs | 11 ++ ng-app/src/App.svelte | 4 +- ng-app/src/api.ts | 3 +- ng-app/src/lib/FullLayout.svelte | 4 + ng-app/src/lib/Login.svelte | 4 +- ng-app/src/lib/Test.svelte | 41 +++---- ng-app/src/routes/WalletLogin.svelte | 30 ++++-- ng-app/src/store.ts | 24 +++-- ng-app/src/wallet_emojis.ts | 2 +- ng-broker/src/rocksdb_server_storage.rs | 10 +- ng-broker/src/server_broker.rs | 16 ++- ng-broker/src/server_storage/core/topic.rs | 7 +- ng-net/src/actors/client/event.rs | 11 +- ng-net/src/actors/client/topic_sync_req.rs | 4 + ng-net/src/broker.rs | 27 +++-- ng-net/src/server_broker.rs | 3 + ng-net/src/types.rs | 24 ++++- ng-repo/src/branch.rs | 48 ++++----- ng-repo/src/errors.rs | 1 + ng-repo/src/kcv_storage.rs | 118 ++++++++++++++++++--- ng-repo/src/repo.rs | 3 + ng-repo/src/store.rs | 2 + ng-repo/src/types.rs | 5 + ng-sdk-js/src/lib.rs | 11 ++ ng-verifier/Cargo.toml | 1 + ng-verifier/src/commits/mod.rs | 2 + ng-verifier/src/request_processor.rs | 8 ++ ng-verifier/src/types.rs | 14 +++ ng-verifier/src/user_storage/branch.rs | 34 +++++- ng-verifier/src/verifier.rs | 51 ++++++--- ng-wallet/src/emojis.rs | 2 +- ng-wallet/src/types.rs | 1 + 34 files changed, 459 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 654face..ab48578 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3429,6 +3429,7 @@ dependencies = [ "blake3", "chacha20", "either", + "fastbloom-rs", "futures", "getrandom 0.2.10", "ng-net", diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index 5e6744f..dd35337 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -478,6 +478,21 @@ impl LocalBroker { } } + pub fn get_site_store_of_session( + &self, + session: &Session, + store_type: SiteStoreType, + ) -> Result { + match self.opened_wallets.get(&session.config.wallet_name()) { + Some(opened_wallet) => { + let user_id = session.config.user_id(); + let site = opened_wallet.wallet.site(&user_id)?; + Ok(site.get_site_store_id(store_type)) + } + None => Err(NgError::WalletNotFound), + } + } + fn verifier_config_type_from_session_config( &self, config: &SessionConfig, @@ -609,6 +624,8 @@ impl LocalBroker { mut config: SessionConfig, user_priv_key: Option, ) -> Result { + let intermediary_step = user_priv_key.is_some(); + let broker = self; let wallet_name: String = config.wallet_name(); @@ -640,6 +657,9 @@ impl LocalBroker { return Ok(SessionInfo { session_id: *idx, user: user_id, + private_store_id: broker + .get_site_store_of_session(sess, SiteStoreType::Private)? + .to_string(), }); } } @@ -807,19 +827,28 @@ impl LocalBroker { //load verifier from local_storage (if rocks_db) let _ = verifier.load(); - - broker.opened_sessions_list.push(Some(Session { + let session = Session { config, peer_key: session.peer_key.clone(), last_wallet_nonce: session.last_wallet_nonce, verifier, - })); + }; + let private_store_id = if intermediary_step { + "".to_string() + } else { + broker + .get_site_store_of_session(&session, SiteStoreType::Private)? + .to_string() + }; + + broker.opened_sessions_list.push(Some(session)); let idx = broker.opened_sessions_list.len() - 1; broker.opened_sessions.insert(user_id, idx as u64); Ok(SessionInfo { session_id: idx as u64, user: user_id, + private_store_id, }) } } @@ -1031,7 +1060,7 @@ pub async fn wallet_create_v0(params: CreateWalletV0) -> Result Result Result { +pub async fn personal_site_store( + session_id: u64, + store_type: SiteStoreType, +) -> Result { let broker = match LOCAL_BROKER.get() { None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized), Some(Ok(broker)) => broker.read().await, @@ -1597,14 +1639,7 @@ pub async fn personal_site_store(session_id: u64, store: SiteStoreType) -> Resul .as_ref() .ok_or(NgError::SessionNotFound)?; - match broker.opened_wallets.get(&session.config.wallet_name()) { - Some(opened_wallet) => { - let user_id = session.config.user_id(); - let site = opened_wallet.wallet.site(&user_id)?; - Ok(site.get_site_store_id(store)) - } - None => Err(NgError::WalletNotFound), - } + broker.get_site_store_of_session(session, store_type) } #[doc(hidden)] diff --git a/ng-app/src-tauri/src/lib.rs b/ng-app/src-tauri/src/lib.rs index 941ada8..56729dd 100644 --- a/ng-app/src-tauri/src/lib.rs +++ b/ng-app/src-tauri/src/lib.rs @@ -292,6 +292,16 @@ async fn doc_fetch_private_subscribe() -> Result { Ok(request) } +#[tauri::command(rename_all = "snake_case")] +async fn doc_fetch_repo_subscribe(repo_id: String) -> Result { + let request = AppRequest::V0(AppRequestV0 { + command: AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)), + nuri: NuriV0::new_repo_target_from_string(repo_id).map_err(|e| e.to_string())?, + payload: None, + }); + Ok(request) +} + #[tauri::command(rename_all = "snake_case")] async fn app_request( session_id: u64, @@ -499,6 +509,7 @@ impl AppBuilder { user_disconnect, client_info_rust, doc_fetch_private_subscribe, + doc_fetch_repo_subscribe, cancel_stream, app_request_stream, app_request, diff --git a/ng-app/src/App.svelte b/ng-app/src/App.svelte index 465d6a3..82d63c4 100644 --- a/ng-app/src/App.svelte +++ b/ng-app/src/App.svelte @@ -161,7 +161,7 @@ break; case "opened": if (!$opened_wallets[event.data.wallet.id]) { - await tick(); + //await tick(); // console.log( // "ADDING TO OPENED", // event.data.wallet.id, @@ -226,7 +226,7 @@ w[value.id] = value.wallet; return w; }); - await tick(); + //await tick(); //console.log("posting opened"); wallet_channel.postMessage( { diff --git a/ng-app/src/api.ts b/ng-app/src/api.ts index 5c9b107..f83507f 100644 --- a/ng-app/src/api.ts +++ b/ng-app/src/api.ts @@ -33,7 +33,8 @@ const mapping = { "user_disconnect": ["user_id"], "app_request": ["session_id","request"], "test": [ ], - "doc_fetch_private_subscribe": [] + "doc_fetch_private_subscribe": [], + "doc_fetch_repo_subscribe": ["repo_id"], } diff --git a/ng-app/src/lib/FullLayout.svelte b/ng-app/src/lib/FullLayout.svelte index 27ecf8d..ed66831 100644 --- a/ng-app/src/lib/FullLayout.svelte +++ b/ng-app/src/lib/FullLayout.svelte @@ -200,4 +200,8 @@ .full-layout { height: 100vh; } + main { + overflow: hidden; + overflow-wrap: break-word; + } diff --git a/ng-app/src/lib/Login.svelte b/ng-app/src/lib/Login.svelte index 4c89da1..6e743bb 100644 --- a/ng-app/src/lib/Login.svelte +++ b/ng-app/src/lib/Login.svelte @@ -159,9 +159,9 @@ const myWorker = new worker_import.default(); myWorker.onerror = (e) => { console.error(e); - error = e; + error = "WebWorker error"; step = "end"; - dispatch("error", { error: e }); + dispatch("error", { error }); }; myWorker.onmessageerror = (e) => { console.error(e); diff --git a/ng-app/src/lib/Test.svelte b/ng-app/src/lib/Test.svelte index 695d450..3031989 100644 --- a/ng-app/src/lib/Test.svelte +++ b/ng-app/src/lib/Test.svelte @@ -23,7 +23,7 @@ let is_tauri = import.meta.env.TAURI_PLATFORM; - let files = branch_subs("ok"); + let files = $active_session && branch_subs($active_session.private_store_id); let img_map = {}; @@ -253,25 +253,26 @@ bind:this={fileinput} /> + {#if files} + {#await files.load()} +

Currently loading...

+ {:then} + {#each $files as file} +

+ {file.V0.File.name} - {#await files.load()} -

Currently loading...

- {:then} - {#each $files as file} -

- {file.V0.File.name} - - {#await get_img(file.V0.File) then url} - {#if url} - {file.V0.File.name} - {/if} - {/await} -

- {/each} - {/await} + {#await get_img(file.V0.File) then url} + {#if url} + {file.V0.File.name} + {/if} + {/await} +

+ {/each} + {/await} + {/if} {/if} diff --git a/ng-app/src/routes/WalletLogin.svelte b/ng-app/src/routes/WalletLogin.svelte index 3b40997..fe54473 100644 --- a/ng-app/src/routes/WalletLogin.svelte +++ b/ng-app/src/routes/WalletLogin.svelte @@ -56,21 +56,29 @@ }); opened_wallets_unsub = opened_wallets.subscribe(async (value) => { if (!$active_wallet && selected && value[selected]) { - await tick(); + //await tick(); active_wallet.set({ wallet: value[selected], id: selected }); } }); active_wallet_unsub = active_wallet.subscribe(async (value) => { if (value && value.wallet) { if (!$active_session) { - let session = await ng.session_start( - value.id, - value.wallet.V0.personal_site - ); - //console.log(session); - if (session) { - set_active_session(session); - loggedin(); + try { + let session = await ng.session_start( + value.id, + value.wallet.V0.personal_site + ); + //console.log(session); + if (session) { + set_active_session(session); + loggedin(); + } + } catch (e) { + error = e; + importing = false; + wallet = undefined; + selected = undefined; + active_wallet.set(undefined); } } else { loggedin(); @@ -137,7 +145,7 @@ let client = await ng.wallet_was_opened(event.detail.wallet); event.detail.wallet.V0.client = client; } - await tick(); + //await tick(); active_wallet.set(event.detail); // { wallet, // id } @@ -381,7 +389,7 @@ position: absolute; left: 0; padding: 5px; - background-color: #ffffff70; + background-color: #ffffffd0; overflow-wrap: break-word; } .wallet-box:focus .securitytxt { diff --git a/ng-app/src/store.ts b/ng-app/src/store.ts index c6ab2d9..cb7e728 100644 --- a/ng-app/src/store.ts +++ b/ng-app/src/store.ts @@ -93,6 +93,11 @@ export const close_active_session = async function() { active_session.set(undefined); //console.log("setting active_session to undefined",get(active_session)); + for (const branch of Object.keys(all_branches)) { + let sub = all_branches[branch]; + sub.unsubscribe(); + } + } const can_connect = derived([active_wallet, active_session], ([$s1, $s2]) => [ @@ -168,7 +173,7 @@ can_connect.subscribe(async (value) => { } }); -export const branch_subs = function(nura) { +export const branch_subs = function(nuri) { // console.log("branch_commits") // const { subscribe, set, update } = writable([]); // create the underlying writable store @@ -176,7 +181,7 @@ export const branch_subs = function(nura) { // return { // load: async () => { // console.log("load") - // unsub = await ng.doc_sync_branch(nura, async (commit) => { + // unsub = await ng.doc_sync_branch(nuri, async (commit) => { // console.log(commit); // update( (old) => {old.unshift(commit); return old;} ) // }); @@ -201,7 +206,7 @@ export const branch_subs = function(nura) { return { load: async () => { //console.log("load upper"); - let already_subscribed = all_branches[nura]; + let already_subscribed = all_branches[nuri]; if (!already_subscribed) return; if (already_subscribed.load) { //console.log("doing the load"); @@ -213,7 +218,7 @@ export const branch_subs = function(nura) { }, subscribe: (run, invalid) => { - let already_subscribed = all_branches[nura]; + let already_subscribed = all_branches[nuri]; if (!already_subscribed) { const { subscribe, set, update } = writable([]); // create the underlying writable store let count = 0; @@ -230,7 +235,7 @@ export const branch_subs = function(nura) { unsub(); unsub = () => {}; set([]); - unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_private_subscribe(), + unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_repo_subscribe(nuri), async (commit) => { //console.log("GOT APP RESPONSE", commit); update( (old) => {old.unshift(commit); return old;} ) @@ -253,11 +258,16 @@ export const branch_subs = function(nura) { // if (count == 0) { // unsub(); // console.log("removed sub"); - // delete all_branches[nura]; + // delete all_branches[nuri]; // } }, + unsubscribe: () => { + unsub(); + console.log("unsubscribed ",nuri); + delete all_branches[nuri]; + } } - all_branches[nura] = already_subscribed; + all_branches[nuri] = already_subscribed; } let new_store = already_subscribed.increase(); diff --git a/ng-app/src/wallet_emojis.ts b/ng-app/src/wallet_emojis.ts index 1336f1f..47dd420 100644 --- a/ng-app/src/wallet_emojis.ts +++ b/ng-app/src/wallet_emojis.ts @@ -443,7 +443,7 @@ let face = [ { hexcode: "1f6a3", shortcode: "person_rowing_boat", - code: "boat", + code: "rowing_boat", }, { hexcode: "1f3ca", diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index 4c7d348..6c2930b 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -260,6 +260,7 @@ impl RocksDbServerStorage { TopicStorage::get_all_heads(&mut model)?, publisher, topic, + TopicStorage::COMMITS_NBR.get(&mut model)?, )), } } @@ -351,6 +352,7 @@ impl RocksDbServerStorage { TopicStorage::get_all_heads(&mut topic_storage)?, true, *topic_id, + TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, ), ); } @@ -372,6 +374,7 @@ impl RocksDbServerStorage { TopicStorage::get_all_heads(&mut topic_storage)?, false, *topic, + TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, )); } result.extend(rw_topics_added.into_values()); @@ -401,6 +404,7 @@ impl RocksDbServerStorage { TopicStorage::get_all_heads(&mut topic_storage)?, false, *topic, + TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, )); } } @@ -478,6 +482,7 @@ impl RocksDbServerStorage { TopicStorage::get_all_heads(&mut topic_storage)?, is_publisher, *topic, + TopicStorage::COMMITS_NBR.get(&mut topic_storage)?, )) } @@ -640,6 +645,8 @@ impl RocksDbServerStorage { let head = HashSet::from([commit_id]); //TODO: current_heads in TopicInfo in ServerBroker is not updated (but it isn't used so far) TopicStorage::HEADS.remove_from_set_and_add(&mut topic_storage, past, head)?; + + TopicStorage::COMMITS_NBR.increment(&mut topic_storage)?; } } @@ -652,6 +659,7 @@ impl RocksDbServerStorage { topic: &TopicId, known_heads: &Vec, target_heads: &Vec, + known_commits: &Option, ) -> Result, ServerError> { let overlay = self.check_overlay(overlay)?; // quick solution for now using the Branch::sync_req. TODO: use the saved references (ACKS,DEPS) in the server_storage, to have much quicker responses @@ -670,7 +678,7 @@ impl RocksDbServerStorage { let store = Store::new_from_overlay_id(&overlay, Arc::clone(&self.block_storage)); - let commits = Branch::sync_req(target_heads, known_heads, &store) + let commits = Branch::sync_req(target_heads, known_heads, known_commits, &store) .map_err(|_| ServerError::MalformedBranch)?; let mut result = Vec::with_capacity(commits.len()); diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index 2056cf3..a70b285 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -312,6 +312,19 @@ impl IServerBroker for ServerBroker { self.storage.get_commit(overlay, id) } + fn remove_all_subscriptions_of_peer(&mut self, remote_peer: &PubKey) { + for ((overlay, topic), peers) in self.local_subscriptions.iter_mut() { + if peers.remove(remote_peer) { + log_debug!( + "subscription of peer {} to topic {} in overlay {} removed", + remote_peer, + topic, + overlay + ); + } + } + } + fn dispatch_event( &self, overlay: &OverlayId, @@ -344,8 +357,9 @@ impl IServerBroker for ServerBroker { topic: &TopicId, known_heads: &Vec, target_heads: &Vec, + known_commits: &Option, ) -> Result, ServerError> { self.storage - .topic_sync_req(overlay, topic, known_heads, target_heads) + .topic_sync_req(overlay, topic, known_heads, target_heads, known_commits) } } diff --git a/ng-broker/src/server_storage/core/topic.rs b/ng-broker/src/server_storage/core/topic.rs index a8264c4..ddc4a2a 100644 --- a/ng-broker/src/server_storage/core/topic.rs +++ b/ng-broker/src/server_storage/core/topic.rs @@ -53,6 +53,7 @@ impl<'a> TopicStorage<'a> { pub const ADVERT: SingleValueColumn = SingleValueColumn::new(b'a'); pub const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r'); pub const ROOT_COMMIT: SingleValueColumn = SingleValueColumn::new(b'o'); + pub const COMMITS_NBR: CounterValue = CounterValue::new(b'n'); // Topic <-> Users who pinned it (with boolean: R or W) pub const USERS: MultiMapColumn = MultiMapColumn::new(b'u'); @@ -63,7 +64,11 @@ impl<'a> TopicStorage<'a> { "Topic", Some(Self::PREFIX), Some(&Self::REPO), - &[&Self::ADVERT as &dyn ISingleValueColumn, &Self::ROOT_COMMIT], + &[ + &Self::ADVERT as &dyn ISingleValueColumn, + &Self::ROOT_COMMIT, + &Self::COMMITS_NBR, + ], &[&Self::USERS as &dyn IMultiValueColumn, &Self::HEADS], ); diff --git a/ng-net/src/actors/client/event.rs b/ng-net/src/actors/client/event.rs index 154ebcd..90c86f9 100644 --- a/ng-net/src/actors/client/event.rs +++ b/ng-net/src/actors/client/event.rs @@ -82,7 +82,6 @@ impl EActor for Actor<'_, PublishEvent, ()> { // send a ProtocolError if invalid signatures (will disconnect the client) req.event().verify()?; - let broker = BROKER.read().await; let overlay = req.overlay().clone(); let (user_id, remote_peer) = { let fsm = _fsm.lock().await; @@ -91,10 +90,12 @@ impl EActor for Actor<'_, PublishEvent, ()> { fsm.remote_peer().ok_or(ProtocolError::ActorError)?, ) }; - let res = broker - .dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer) - .await; - + let res = { + let mut broker = BROKER.write().await; + broker + .dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer) + .await + }; _fsm.lock() .await .send_in_reply_to(res.into(), self.id()) diff --git a/ng-net/src/actors/client/topic_sync_req.rs b/ng-net/src/actors/client/topic_sync_req.rs index c11de9f..97870a5 100644 --- a/ng-net/src/actors/client/topic_sync_req.rs +++ b/ng-net/src/actors/client/topic_sync_req.rs @@ -34,6 +34,7 @@ impl TopicSyncReq { known_heads: vec![], target_heads: vec![], overlay: Some(*overlay), + known_commits: None, }) } @@ -42,12 +43,14 @@ impl TopicSyncReq { topic_id: &TopicId, known_heads: Vec, target_heads: Vec, + known_commits: Option, ) -> TopicSyncReq { TopicSyncReq::V0(TopicSyncReqV0 { topic: *topic_id, known_heads, target_heads, overlay: Some(repo.store.get_store_repo().overlay_id_for_read_purpose()), + known_commits, }) } } @@ -111,6 +114,7 @@ impl EActor for Actor<'_, TopicSyncReq, TopicSyncRes> { req.topic(), req.known_heads(), req.target_heads(), + req.known_commits(), ); // IF NEEDED, the topic_sync_req could be changed to return a stream, and then the send_in_reply_to would be also totally async diff --git a/ng-net/src/broker.rs b/ng-net/src/broker.rs index bcb0d24..f1a0baf 100644 --- a/ng-net/src/broker.rs +++ b/ng-net/src/broker.rs @@ -336,6 +336,11 @@ impl Broker { let _ = self.remove_user_peer(&user, &peer_id); } } + let peer = PubKey::X25519PubKey(peer_id); + log_debug!("unsubscribing peer {}", peer); + self.get_server_broker_mut() + .unwrap() + .remove_all_subscriptions_of_peer(&peer); } } PeerConnection::Core(ip) => { @@ -834,7 +839,7 @@ impl Broker { local_broker.write().await.user_disconnected(user).await; } } else { - log_info!("REMOVED"); + log_debug!("REMOVED"); BROKER .write() .await @@ -879,7 +884,7 @@ impl Broker { #[cfg(not(target_arch = "wasm32"))] pub async fn dispatch_event( - &self, + &mut self, overlay: &OverlayId, event: Event, user_id: &UserId, @@ -887,12 +892,12 @@ impl Broker { ) -> Result<(), ServerError> { // TODO: deal with subscriptions on the outer overlay. for now we assume everything is on the inner overlay - let peers_for_local_dispatch = self.get_server_broker()?.dispatch_event( - overlay, - event.clone(), - user_id, - remote_peer, - )?; + let peers_for_local_dispatch: Vec = self + .get_server_broker()? + .dispatch_event(overlay, event.clone(), user_id, remote_peer)? + .into_iter() + .cloned() + .collect(); //log_debug!("dispatch_event {:?}", peers_for_local_dispatch); @@ -901,7 +906,7 @@ impl Broker { if let Some(BrokerPeerInfo { connected: PeerConnection::Client(ConnectionBase { fsm: Some(fsm), .. }), .. - }) = self.peers.get(&(None, peer.to_owned().to_dh())) + }) = self.peers.get(&(None, peer.to_dh())) { //log_debug!("ForwardedEvent peer {:?}", peer); let _ = fsm @@ -915,6 +920,10 @@ impl Broker { }, ))) .await; + } else { + // we remove the peer from all local_subscriptions + self.get_server_broker_mut()? + .remove_all_subscriptions_of_peer(&peer); } } diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs index e9970e7..43f8340 100644 --- a/ng-net/src/server_broker.rs +++ b/ng-net/src/server_broker.rs @@ -91,11 +91,14 @@ pub trait IServerBroker: Send + Sync { remote_peer: &PubKey, ) -> Result, ServerError>; + fn remove_all_subscriptions_of_peer(&mut self, remote_peer: &PubKey); + fn topic_sync_req( &self, overlay: &OverlayId, topic: &TopicId, known_heads: &Vec, target_heads: &Vec, + known_commits: &Option, ) -> Result, ServerError>; } diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index c5e1296..cd69217 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -1716,6 +1716,9 @@ pub struct TopicSyncReqV0 { /// if empty, the local HEAD at the responder is used instead pub target_heads: Vec, + /// optional Bloom filter of all the commit IDs present locally (used in case of detected fork) + pub known_commits: Option, + #[serde(skip)] pub overlay: Option, } @@ -1752,6 +1755,11 @@ impl TopicSyncReq { TopicSyncReq::V0(o) => &o.target_heads, } } + pub fn known_commits(&self) -> &Option { + match self { + TopicSyncReq::V0(o) => &o.known_commits, + } + } } /// Status of a Forwarded Peer, sent in the Advert @@ -3380,6 +3388,7 @@ pub struct TopicSubResV0 { pub topic: TopicId, pub known_heads: Vec, pub publisher: bool, + pub commits_nbr: u64, } /// Topic subscription response @@ -3401,11 +3410,17 @@ impl TopicSubRes { Self::V0(v0) => v0.publisher, } } - pub fn new_from_heads(topics: HashSet, publisher: bool, topic: TopicId) -> Self { + pub fn new_from_heads( + topics: HashSet, + publisher: bool, + topic: TopicId, + commits_nbr: u64, + ) -> Self { TopicSubRes::V0(TopicSubResV0 { topic, known_heads: topics.into_iter().collect(), publisher, + commits_nbr, }) } pub fn known_heads(&self) -> &Vec { @@ -3413,6 +3428,11 @@ impl TopicSubRes { Self::V0(v0) => &v0.known_heads, } } + pub fn commits_nbr(&self) -> u64 { + match self { + Self::V0(v0) => v0.commits_nbr, + } + } } impl From for TopicSubRes { @@ -3421,6 +3441,7 @@ impl From for TopicSubRes { topic, known_heads: vec![], publisher: false, + commits_nbr: 0, }) } } @@ -3431,6 +3452,7 @@ impl From for TopicSubRes { topic: topic.topic_id().clone(), known_heads: vec![], publisher: true, + commits_nbr: 0, }) } } diff --git a/ng-repo/src/branch.rs b/ng-repo/src/branch.rs index d3b24f5..5380ab7 100644 --- a/ng-repo/src/branch.rs +++ b/ng-repo/src/branch.rs @@ -13,8 +13,8 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt; +use fastbloom_rs::{BloomFilter as Filter, Membership}; use zeroize::Zeroize; -// use fastbloom_rs::{BloomFilter as Filter, Membership}; use crate::errors::*; #[allow(unused_imports)] @@ -165,12 +165,20 @@ impl Branch { missing: &mut Option<&mut HashSet>, future: Option, theirs_found: &mut Option<&mut HashSet>, + theirs_filter: &Option, ) -> Result<(), ObjectParseError> { let id = cobj.id(); // check if this commit object is present in theirs or has already been visited in the current walk // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads - if !theirs.contains(&id) { + + let found_in_filter = if let Some(filter) = theirs_filter { + filter.contains(id.slice()) + } else { + false + }; + + if !found_in_filter && !theirs.contains(&id) { if let Some(past) = visited.get_mut(&id) { // we update the future if let Some(f) = future { @@ -193,6 +201,7 @@ impl Branch { missing, Some(id), theirs_found, + theirs_filter, )?; } Err(ObjectParseError::MissingBlocks(blocks)) => { @@ -219,13 +228,9 @@ impl Branch { pub fn sync_req( target_heads: impl Iterator, known_heads: &[ObjectId], - //their_filter: &BloomFilter, + known_commits: &Option, store: &Store, ) -> Result, ObjectParseError> { - //log_debug!(">> sync_req"); - //log_debug!(" target_heads: {:?}", target_heads); - //log_debug!(" known_heads: {:?}", known_heads); - // their commits let mut theirs: HashMap = HashMap::new(); @@ -240,6 +245,7 @@ impl Branch { &mut None, None, &mut None, + &None, )?; } // we silently discard any load error on the known_heads as the responder might not know them (yet). @@ -249,6 +255,10 @@ impl Branch { let theirs: HashSet = theirs.keys().into_iter().cloned().collect(); + let filter = known_commits + .as_ref() + .map(|their_filter| Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k)); + // collect all commits reachable from target_heads // up to the root or until encountering a commit from theirs for id in target_heads { @@ -261,27 +271,12 @@ impl Branch { &mut None, None, &mut None, + &filter, )?; } // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally. } - //log_debug!("!! ours: {:?}", ours); - //log_debug!("!! theirs: {:?}", theirs); - - // remove their_commits from result - // let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into()); - // for id in result.clone() { - // match id { - // Digest::Blake3Digest32(d) => { - // if filter.contains(&d) { - // result.remove(&id); - // } - // } - // } - // } - //log_debug!("!! result filtered: {:?}", result); - // now ordering to respect causal partial order. let mut next_generations = HashSet::new(); for (_, node) in visited.iter() { @@ -297,6 +292,11 @@ impl Branch { result.append(&mut DagNode::collapse(first, &visited)); } + // #[cfg(debug_assertions)] + // for _res in result.iter() { + // log_debug!("sending missing commit {}", _res); + // } + Ok(result) } } @@ -574,7 +574,7 @@ mod test { let ids = Branch::sync_req( [t5.id, a6.id, a7.id].into_iter(), &[t5.id], - //&their_commits, + &None, &repo.store, ) .unwrap(); diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 178964c..623ce50 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -76,6 +76,7 @@ pub enum NgError { WrongUploadId, FileError(FileError), InternalError, + OxiGraphError(String), } impl Error for NgError {} diff --git a/ng-repo/src/kcv_storage.rs b/ng-repo/src/kcv_storage.rs index 170042c..ba3efc4 100644 --- a/ng-repo/src/kcv_storage.rs +++ b/ng-repo/src/kcv_storage.rs @@ -674,22 +674,6 @@ pub trait IMultiValueColumn { fn value_size(&self) -> Result; } -pub struct ExistentialValueColumn { - suffix: u8, -} - -impl ISingleValueColumn for ExistentialValueColumn { - fn suffix(&self) -> u8 { - self.suffix - } -} - -impl ExistentialValueColumn { - pub const fn new(suffix: u8) -> Self { - ExistentialValueColumn { suffix } - } -} - pub struct SingleValueColumn Deserialize<'a>> { suffix: u8, phantom_value: PhantomData, @@ -779,6 +763,108 @@ impl Deserialize<'d>> } } +///////////// Counter Value + +pub struct CounterValue { + suffix: u8, + phantom_model: PhantomData, +} + +impl ISingleValueColumn for CounterValue { + fn suffix(&self) -> u8 { + self.suffix + } +} + +impl CounterValue { + pub const fn new(suffix: u8) -> Self { + CounterValue { + suffix, + phantom_model: PhantomData, + } + } + + pub fn increment(&self, model: &mut Model) -> Result<(), StorageError> { + model.storage().write_transaction(&mut |tx| { + let mut val: u64 = match tx.get(model.prefix(), model.key(), Some(self.suffix), &None) { + Ok(val_ser) => from_slice(&val_ser)?, + Err(StorageError::NotFound) => 0, + Err(e) => return Err(e), + }; + val += 1; + let val_ser = to_vec(&val)?; + tx.put( + model.prefix(), + model.key(), + Some(self.suffix), + &val_ser, + &None, + )?; + Ok(()) + }) + } + /// returns true if the counter reached zero, and the property was removed + pub fn decrement(&self, model: &mut Model) -> Result { + let mut ret: bool = false; + model.storage().write_transaction(&mut |tx| { + let val_ser = tx.get(model.prefix(), model.key(), Some(self.suffix), &None)?; + let mut val: u64 = from_slice(&val_ser)?; + val -= 1; + ret = val == 0; + if ret { + tx.del(model.prefix(), model.key(), Some(self.suffix), &None)?; + } else { + let val_ser = to_vec(&val)?; + tx.put( + model.prefix(), + model.key(), + Some(self.suffix), + &val_ser, + &None, + )?; + } + Ok(()) + })?; + Ok(ret) + } + + pub fn get(&self, model: &mut Model) -> Result { + let val_res = model + .storage() + .get(model.prefix(), model.key(), Some(self.suffix), &None); + match val_res { + Ok(val_ser) => Ok(from_slice(&val_ser)?), + Err(StorageError::NotFound) => Ok(0), + Err(e) => Err(e), + } + } + + pub fn del(&self, model: &mut Model) -> Result<(), StorageError> { + model.check_exists()?; + model + .storage() + .del(model.prefix(), model.key(), Some(self.suffix), &None) + } +} + +//////////////// + +pub struct ExistentialValueColumn { + suffix: u8, +} + +impl ISingleValueColumn for ExistentialValueColumn { + fn suffix(&self) -> u8 { + self.suffix + } +} + +impl ExistentialValueColumn { + pub const fn new(suffix: u8) -> Self { + ExistentialValueColumn { suffix } + } +} + pub struct ExistentialValue Deserialize<'d>> { value: Option, value_ser: Vec, diff --git a/ng-repo/src/repo.rs b/ng-repo/src/repo.rs index b9c2b0b..b0d9b57 100644 --- a/ng-repo/src/repo.rs +++ b/ng-repo/src/repo.rs @@ -98,6 +98,8 @@ pub struct BranchInfo { pub read_cap: ReadCap, pub current_heads: Vec, + + pub commits_nbr: u64, } /// In memory Repository representation. With helper functions that access the underlying UserStore and keeps proxy of the values @@ -173,6 +175,7 @@ impl Repo { } branch.current_heads = set.into_iter().cloned().collect(); branch.current_heads.push(commit_ref); + branch.commits_nbr += 1; // we return the new current heads Ok(branch.current_heads.to_vec()) } else { diff --git a/ng-repo/src/store.rs b/ng-repo/src/store.rs index c55400e..9329a70 100644 --- a/ng-repo/src/store.rs +++ b/ng-repo/src/store.rs @@ -256,6 +256,7 @@ impl Store { topic_priv_key: Some(branch_topic_priv_key), read_cap: branch_read_cap, current_heads: vec![], + commits_nbr: 0, }; Ok((branch_commit, add_branch_commit, branch_info)) @@ -609,6 +610,7 @@ impl Store { topic_priv_key: Some(topic_priv_key), read_cap: root_branch_readcap.clone(), current_heads: vec![sync_sig_on_root_branch_commit_ref], + commits_nbr: 0, }; branches.push((root_branch.id, root_branch)); diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 62cb24b..e97c87b 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -43,6 +43,11 @@ impl Digest { pub fn from_slice(slice: [u8; 32]) -> Digest { Digest::Blake3Digest32(slice) } + pub fn slice(&self) -> &[u8; 32] { + match self { + Self::Blake3Digest32(o) => o, + } + } } impl fmt::Display for Digest { diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 57d9876..d0abb67 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -585,6 +585,17 @@ pub async fn doc_fetch_private_subscribe() -> Result { Ok(serde_wasm_bindgen::to_value(&request).unwrap()) } +#[cfg(target_arch = "wasm32")] +#[wasm_bindgen] +pub async fn doc_fetch_repo_subscribe(repo_id: String) -> Result { + let request = AppRequest::V0(AppRequestV0 { + command: AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)), + nuri: NuriV0::new_repo_target_from_string(repo_id).map_err(|e| e.to_string())?, + payload: None, + }); + Ok(serde_wasm_bindgen::to_value(&request).unwrap()) +} + // #[cfg(target_arch = "wasm32")] // #[wasm_bindgen] // pub async fn get_readcap() -> Result { diff --git a/ng-verifier/Cargo.toml b/ng-verifier/Cargo.toml index 87a5107..edf9489 100644 --- a/ng-verifier/Cargo.toml +++ b/ng-verifier/Cargo.toml @@ -36,6 +36,7 @@ web-time = "0.2.0" either = "1.8.1" futures = "0.3.24" async-trait = "0.1.64" +fastbloom-rs = "0.5.3" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" } diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index 3d30eb1..46d7ff4 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -111,6 +111,7 @@ impl CommitVerifier for RootBranch { topic_priv_key, read_cap: reference.clone(), current_heads: vec![reference.clone()], + commits_nbr: 1, }; let id = root_branch.id; let branches = vec![(root_branch.id, root_branch)]; @@ -251,6 +252,7 @@ impl CommitVerifier for AddBranch { topic_priv_key: None, read_cap: v0.branch_read_cap.clone(), current_heads: vec![], + commits_nbr: 0, }; verifier.add_branch_and_save( diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 05b4c0b..7dfa48d 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -130,6 +130,14 @@ impl AppRequestCommandV0 { }; Ok((repo_id, branch, store_repo)) } + NuriTargetV0::Repo(repo_id) => { + let (branch, store_repo) = { + let repo = verifier.repos.get(repo_id).ok_or(NgError::RepoNotFound)?; + let branch = repo.main_branch().ok_or(NgError::BranchNotFound)?; + (branch.id, repo.store.get_store_repo().clone()) + }; + Ok((*repo_id, branch, store_repo)) + } _ => unimplemented!(), } } diff --git a/ng-verifier/src/types.rs b/ng-verifier/src/types.rs index bbb86f2..0ca4358 100644 --- a/ng-verifier/src/types.rs +++ b/ng-verifier/src/types.rs @@ -221,6 +221,20 @@ pub struct NuriV0 { } impl NuriV0 { + pub fn new_repo_target_from_string(repo_id_string: String) -> Result { + let repo_id: RepoId = repo_id_string.as_str().try_into()?; + Ok(Self { + target: NuriTargetV0::Repo(repo_id), + entire_store: false, + object: None, + branch: None, + overlay: None, + access: vec![], + topic: None, + locator: vec![], + }) + } + pub fn new_private_store_target() -> Self { Self { target: NuriTargetV0::PrivateStore, diff --git a/ng-verifier/src/user_storage/branch.rs b/ng-verifier/src/user_storage/branch.rs index 0170119..c851821 100644 --- a/ng-verifier/src/user_storage/branch.rs +++ b/ng-verifier/src/user_storage/branch.rs @@ -15,6 +15,8 @@ use serde_bare::to_vec; use ng_repo::errors::StorageError; use ng_repo::kcv_storage::prop; use ng_repo::kcv_storage::KCVStorage; +#[allow(unused_imports)] +use ng_repo::log::*; use ng_repo::repo::BranchInfo; use ng_repo::types::*; @@ -33,8 +35,15 @@ impl<'a> BranchStorage<'a> { const PUBLISHER: u8 = b'p'; const READ_CAP: u8 = b'r'; const TOPIC: u8 = b't'; + const COMMITS_NBR: u8 = b'n'; - const ALL_PROPERTIES: [u8; 4] = [Self::TYPE, Self::PUBLISHER, Self::READ_CAP, Self::TOPIC]; + const ALL_PROPERTIES: [u8; 5] = [ + Self::TYPE, + Self::PUBLISHER, + Self::READ_CAP, + Self::TOPIC, + Self::COMMITS_NBR, + ]; const PREFIX_HEADS: u8 = b'h'; @@ -136,6 +145,7 @@ impl<'a> BranchStorage<'a> { topic: prop(Self::TOPIC, &props)?, topic_priv_key: prop(Self::PUBLISHER, &props).ok(), current_heads: Self::get_all_heads(id, storage)?, + commits_nbr: prop(Self::COMMITS_NBR, &props).unwrap_or(0), }; Ok(bs) } @@ -227,6 +237,28 @@ impl<'a> BranchStorage<'a> { key.append(&mut head_ser); tx.put(Self::PREFIX_HEADS, &key, None, &vec![], &None)?; } + + let mut val: u64 = match tx.get(Self::PREFIX, id_ser, Some(Self::COMMITS_NBR), &None) { + Ok(val_ser) => from_slice(&val_ser)?, + Err(StorageError::NotFound) => 0, + Err(e) => return Err(e), + }; + val += 1; + let val_ser = to_vec(&val)?; + tx.put( + Self::PREFIX, + id_ser, + Some(Self::COMMITS_NBR), + &val_ser, + &None, + )?; + // log_info!( + // "putting commit_nbr {} {:?} {} {:?}", + // Self::PREFIX as char, + // id_ser, + // Self::COMMITS_NBR as char, + // val_ser + // ); Ok(()) }) } diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 09bc507..9930e28 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -21,6 +21,7 @@ use std::{collections::HashMap, sync::Arc}; use async_std::stream::StreamExt; use async_std::sync::{Mutex, RwLockReadGuard}; +use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Hashes, Membership}; use futures::channel::mpsc; use futures::SinkExt; use ng_repo::object::Object; @@ -191,7 +192,7 @@ impl Verifier { // ); if let Some(sender) = self.branch_subscriptions.get_mut(branch) { if sender.is_closed() { - //log_info!("closed so removed"); + log_info!("closed so removed {}", branch); self.branch_subscriptions.remove(branch); } else { let _ = sender.send(response).await; @@ -227,7 +228,7 @@ impl Verifier { } let fnonce = Box::new(move || { - //log_info!("CLOSE_CHANNEL"); + log_info!("CLOSE_CHANNEL of subscription for branch {}", branch); if !tx.is_closed() { tx.close_channel(); } @@ -739,7 +740,7 @@ impl Verifier { .ok_or(NgError::TopicNotFound)? .to_owned(); - self.update_branch_current_heads(&repo_id, &branch_id, past, commit_ref); + self.update_branch_current_heads(&repo_id, &branch_id, past, commit_ref)?; if self.connected_server_id.is_some() { // send the event to the server already @@ -965,6 +966,7 @@ impl Verifier { branch, repo_id, topic.known_heads(), + topic.commits_nbr(), ) .await?; break; @@ -999,6 +1001,7 @@ impl Verifier { branch, repo_id, topic.known_heads(), + topic.commits_nbr(), ) .await?; break; @@ -1042,6 +1045,7 @@ impl Verifier { branch, repo_id, sub.known_heads(), + sub.commits_nbr(), ) .await?; } @@ -1167,7 +1171,7 @@ impl Verifier { if res.is_ok() && !skip_heads_update { let commit_ref = commit.reference().unwrap(); let past = commit.direct_causal_past(); - self.update_branch_current_heads(repo_id, branch_id, past, commit_ref); + self.update_branch_current_heads(repo_id, branch_id, past, commit_ref)?; Ok(()) } else { res @@ -1291,8 +1295,14 @@ impl Verifier { branch_id: &BranchId, repo_id: &RepoId, remote_heads: &Vec, + remote_commits_nbr: u64, ) -> Result<(), NgError> { let (store, msg, branch_secret) = { + if remote_commits_nbr == 0 || remote_heads.is_empty() { + log_info!("branch is new on the broker. doing nothing"); + return Ok(()); + } + let repo = self.repos.get(repo_id).unwrap(); let branch_info = repo.branch(branch_id)?; @@ -1302,10 +1312,7 @@ impl Verifier { let ours_set: HashSet = HashSet::from_iter(ours.clone()); let theirs = HashSet::from_iter(remote_heads.clone().into_iter()); - if theirs.is_empty() { - log_info!("branch is new on the broker. doing nothing"); - return Ok(()); - } + if ours_set.difference(&theirs).count() == 0 && theirs.difference(&ours_set).count() == 0 { @@ -1326,6 +1333,7 @@ impl Verifier { &mut None, None, &mut Some(&mut theirs_found), + &None, ); } } @@ -1333,16 +1341,33 @@ impl Verifier { let theirs_not_found: Vec = theirs.difference(&theirs_found).cloned().collect(); - if theirs_not_found.is_empty() { + let known_commits = if theirs_not_found.is_empty() { return Ok(()); } else { - // prepare bloom filter - } + if visited.is_empty() { + None + } else { + // prepare bloom filter + let expected_elements = + remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr); + let mut config = FilterBuilder::new(expected_elements, 0.01); + config.enable_repeat_insert(false); + let mut filter = Filter::new(config); + for commit_id in visited.keys() { + filter.add(commit_id.slice()); + } + Some(BloomFilter { + k: filter.hashes(), + f: filter.get_u8_array().to_vec(), + }) + } + }; let msg = TopicSyncReq::V0(TopicSyncReqV0 { topic: branch_info.topic, known_heads: ours_set.union(&theirs_found).into_iter().cloned().collect(), target_heads: theirs_not_found, + known_commits, overlay: Some(store.overlay_for_read_on_client_protocol()), }); (store, msg, branch_info.read_cap.key.clone()) @@ -1359,6 +1384,8 @@ impl Verifier { .event() .open(&store, repo_id, branch_id, &branch_secret)?; + // TODO: deal with missing commits in the DAG (fetch them individually with CommitGet). This can happen because of false positive on BloomFilter + self.verify_commit(&commit, branch_id, repo_id, Arc::clone(&store)) .await?; } @@ -1940,7 +1967,7 @@ impl Verifier { // and have oxigraph use directly the UserStorage Some( oxigraph::store::Store::open_with_key(path_oxi, config.user_master_key) - .unwrap(), + .map_err(|e| NgError::OxiGraphError(e.to_string()))?, ), Some(Box::new(RocksDbUserStorage::open( &path_user, diff --git a/ng-wallet/src/emojis.rs b/ng-wallet/src/emojis.rs index 48e425a..0f20ed7 100644 --- a/ng-wallet/src/emojis.rs +++ b/ng-wallet/src/emojis.rs @@ -427,7 +427,7 @@ const sport: [EmojiDef<'static>; 15] = [ EmojiDef { hexcode: "1f6a3", shortcode: "person_rowing_boat", - code: "boat", + code: "rowing_boat", }, EmojiDef { hexcode: "1f3ca", diff --git a/ng-wallet/src/types.rs b/ng-wallet/src/types.rs index 72953bd..630b86d 100644 --- a/ng-wallet/src/types.rs +++ b/ng-wallet/src/types.rs @@ -133,6 +133,7 @@ impl SessionWalletStorageV0 { pub struct SessionInfo { pub session_id: u64, pub user: UserId, + pub private_store_id: String, } #[derive(Clone, Debug, Serialize, Deserialize)]