diff --git a/Cargo.lock b/Cargo.lock
index 654face..ab48578 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3429,6 +3429,7 @@ dependencies = [
"blake3",
"chacha20",
"either",
+ "fastbloom-rs",
"futures",
"getrandom 0.2.10",
"ng-net",
diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs
index 5e6744f..dd35337 100644
--- a/nextgraph/src/local_broker.rs
+++ b/nextgraph/src/local_broker.rs
@@ -478,6 +478,21 @@ impl LocalBroker {
}
}
+ pub fn get_site_store_of_session(
+ &self,
+ session: &Session,
+ store_type: SiteStoreType,
+ ) -> Result {
+ match self.opened_wallets.get(&session.config.wallet_name()) {
+ Some(opened_wallet) => {
+ let user_id = session.config.user_id();
+ let site = opened_wallet.wallet.site(&user_id)?;
+ Ok(site.get_site_store_id(store_type))
+ }
+ None => Err(NgError::WalletNotFound),
+ }
+ }
+
fn verifier_config_type_from_session_config(
&self,
config: &SessionConfig,
@@ -609,6 +624,8 @@ impl LocalBroker {
mut config: SessionConfig,
user_priv_key: Option,
) -> Result {
+ let intermediary_step = user_priv_key.is_some();
+
let broker = self;
let wallet_name: String = config.wallet_name();
@@ -640,6 +657,9 @@ impl LocalBroker {
return Ok(SessionInfo {
session_id: *idx,
user: user_id,
+ private_store_id: broker
+ .get_site_store_of_session(sess, SiteStoreType::Private)?
+ .to_string(),
});
}
}
@@ -807,19 +827,28 @@ impl LocalBroker {
//load verifier from local_storage (if rocks_db)
let _ = verifier.load();
-
- broker.opened_sessions_list.push(Some(Session {
+ let session = Session {
config,
peer_key: session.peer_key.clone(),
last_wallet_nonce: session.last_wallet_nonce,
verifier,
- }));
+ };
+ let private_store_id = if intermediary_step {
+ "".to_string()
+ } else {
+ broker
+ .get_site_store_of_session(&session, SiteStoreType::Private)?
+ .to_string()
+ };
+
+ broker.opened_sessions_list.push(Some(session));
let idx = broker.opened_sessions_list.len() - 1;
broker.opened_sessions.insert(user_id, idx as u64);
Ok(SessionInfo {
session_id: idx as u64,
user: user_id,
+ private_store_id,
})
}
}
@@ -1031,7 +1060,7 @@ pub async fn wallet_create_v0(params: CreateWalletV0) -> Result Result Result {
+pub async fn personal_site_store(
+ session_id: u64,
+ store_type: SiteStoreType,
+) -> Result {
let broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized),
Some(Ok(broker)) => broker.read().await,
@@ -1597,14 +1639,7 @@ pub async fn personal_site_store(session_id: u64, store: SiteStoreType) -> Resul
.as_ref()
.ok_or(NgError::SessionNotFound)?;
- match broker.opened_wallets.get(&session.config.wallet_name()) {
- Some(opened_wallet) => {
- let user_id = session.config.user_id();
- let site = opened_wallet.wallet.site(&user_id)?;
- Ok(site.get_site_store_id(store))
- }
- None => Err(NgError::WalletNotFound),
- }
+ broker.get_site_store_of_session(session, store_type)
}
#[doc(hidden)]
diff --git a/ng-app/src-tauri/src/lib.rs b/ng-app/src-tauri/src/lib.rs
index 941ada8..56729dd 100644
--- a/ng-app/src-tauri/src/lib.rs
+++ b/ng-app/src-tauri/src/lib.rs
@@ -292,6 +292,16 @@ async fn doc_fetch_private_subscribe() -> Result {
Ok(request)
}
+#[tauri::command(rename_all = "snake_case")]
+async fn doc_fetch_repo_subscribe(repo_id: String) -> Result {
+ let request = AppRequest::V0(AppRequestV0 {
+ command: AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)),
+ nuri: NuriV0::new_repo_target_from_string(repo_id).map_err(|e| e.to_string())?,
+ payload: None,
+ });
+ Ok(request)
+}
+
#[tauri::command(rename_all = "snake_case")]
async fn app_request(
session_id: u64,
@@ -499,6 +509,7 @@ impl AppBuilder {
user_disconnect,
client_info_rust,
doc_fetch_private_subscribe,
+ doc_fetch_repo_subscribe,
cancel_stream,
app_request_stream,
app_request,
diff --git a/ng-app/src/App.svelte b/ng-app/src/App.svelte
index 465d6a3..82d63c4 100644
--- a/ng-app/src/App.svelte
+++ b/ng-app/src/App.svelte
@@ -161,7 +161,7 @@
break;
case "opened":
if (!$opened_wallets[event.data.wallet.id]) {
- await tick();
+ //await tick();
// console.log(
// "ADDING TO OPENED",
// event.data.wallet.id,
@@ -226,7 +226,7 @@
w[value.id] = value.wallet;
return w;
});
- await tick();
+ //await tick();
//console.log("posting opened");
wallet_channel.postMessage(
{
diff --git a/ng-app/src/api.ts b/ng-app/src/api.ts
index 5c9b107..f83507f 100644
--- a/ng-app/src/api.ts
+++ b/ng-app/src/api.ts
@@ -33,7 +33,8 @@ const mapping = {
"user_disconnect": ["user_id"],
"app_request": ["session_id","request"],
"test": [ ],
- "doc_fetch_private_subscribe": []
+ "doc_fetch_private_subscribe": [],
+ "doc_fetch_repo_subscribe": ["repo_id"],
}
diff --git a/ng-app/src/lib/FullLayout.svelte b/ng-app/src/lib/FullLayout.svelte
index 27ecf8d..ed66831 100644
--- a/ng-app/src/lib/FullLayout.svelte
+++ b/ng-app/src/lib/FullLayout.svelte
@@ -200,4 +200,8 @@
.full-layout {
height: 100vh;
}
+ main {
+ overflow: hidden;
+ overflow-wrap: break-word;
+ }
diff --git a/ng-app/src/lib/Login.svelte b/ng-app/src/lib/Login.svelte
index 4c89da1..6e743bb 100644
--- a/ng-app/src/lib/Login.svelte
+++ b/ng-app/src/lib/Login.svelte
@@ -159,9 +159,9 @@
const myWorker = new worker_import.default();
myWorker.onerror = (e) => {
console.error(e);
- error = e;
+ error = "WebWorker error";
step = "end";
- dispatch("error", { error: e });
+ dispatch("error", { error });
};
myWorker.onmessageerror = (e) => {
console.error(e);
diff --git a/ng-app/src/lib/Test.svelte b/ng-app/src/lib/Test.svelte
index 695d450..3031989 100644
--- a/ng-app/src/lib/Test.svelte
+++ b/ng-app/src/lib/Test.svelte
@@ -23,7 +23,7 @@
let is_tauri = import.meta.env.TAURI_PLATFORM;
- let files = branch_subs("ok");
+ let files = $active_session && branch_subs($active_session.private_store_id);
let img_map = {};
@@ -253,25 +253,26 @@
bind:this={fileinput}
/>
+ {#if files}
+ {#await files.load()}
+ Currently loading...
+ {:then}
+ {#each $files as file}
+
+ {file.V0.File.name}
- {#await files.load()}
-
Currently loading...
- {:then}
- {#each $files as file}
-
- {file.V0.File.name}
-
- {#await get_img(file.V0.File) then url}
- {#if url}
-
- {/if}
- {/await}
-
- {/each}
- {/await}
+ {#await get_img(file.V0.File) then url}
+ {#if url}
+
+ {/if}
+ {/await}
+
+ {/each}
+ {/await}
+ {/if}
{/if}
diff --git a/ng-app/src/routes/WalletLogin.svelte b/ng-app/src/routes/WalletLogin.svelte
index 3b40997..fe54473 100644
--- a/ng-app/src/routes/WalletLogin.svelte
+++ b/ng-app/src/routes/WalletLogin.svelte
@@ -56,21 +56,29 @@
});
opened_wallets_unsub = opened_wallets.subscribe(async (value) => {
if (!$active_wallet && selected && value[selected]) {
- await tick();
+ //await tick();
active_wallet.set({ wallet: value[selected], id: selected });
}
});
active_wallet_unsub = active_wallet.subscribe(async (value) => {
if (value && value.wallet) {
if (!$active_session) {
- let session = await ng.session_start(
- value.id,
- value.wallet.V0.personal_site
- );
- //console.log(session);
- if (session) {
- set_active_session(session);
- loggedin();
+ try {
+ let session = await ng.session_start(
+ value.id,
+ value.wallet.V0.personal_site
+ );
+ //console.log(session);
+ if (session) {
+ set_active_session(session);
+ loggedin();
+ }
+ } catch (e) {
+ error = e;
+ importing = false;
+ wallet = undefined;
+ selected = undefined;
+ active_wallet.set(undefined);
}
} else {
loggedin();
@@ -137,7 +145,7 @@
let client = await ng.wallet_was_opened(event.detail.wallet);
event.detail.wallet.V0.client = client;
}
- await tick();
+ //await tick();
active_wallet.set(event.detail);
// { wallet,
// id }
@@ -381,7 +389,7 @@
position: absolute;
left: 0;
padding: 5px;
- background-color: #ffffff70;
+ background-color: #ffffffd0;
overflow-wrap: break-word;
}
.wallet-box:focus .securitytxt {
diff --git a/ng-app/src/store.ts b/ng-app/src/store.ts
index c6ab2d9..cb7e728 100644
--- a/ng-app/src/store.ts
+++ b/ng-app/src/store.ts
@@ -93,6 +93,11 @@ export const close_active_session = async function() {
active_session.set(undefined);
//console.log("setting active_session to undefined",get(active_session));
+ for (const branch of Object.keys(all_branches)) {
+ let sub = all_branches[branch];
+ sub.unsubscribe();
+ }
+
}
const can_connect = derived([active_wallet, active_session], ([$s1, $s2]) => [
@@ -168,7 +173,7 @@ can_connect.subscribe(async (value) => {
}
});
-export const branch_subs = function(nura) {
+export const branch_subs = function(nuri) {
// console.log("branch_commits")
// const { subscribe, set, update } = writable([]); // create the underlying writable store
@@ -176,7 +181,7 @@ export const branch_subs = function(nura) {
// return {
// load: async () => {
// console.log("load")
- // unsub = await ng.doc_sync_branch(nura, async (commit) => {
+ // unsub = await ng.doc_sync_branch(nuri, async (commit) => {
// console.log(commit);
// update( (old) => {old.unshift(commit); return old;} )
// });
@@ -201,7 +206,7 @@ export const branch_subs = function(nura) {
return {
load: async () => {
//console.log("load upper");
- let already_subscribed = all_branches[nura];
+ let already_subscribed = all_branches[nuri];
if (!already_subscribed) return;
if (already_subscribed.load) {
//console.log("doing the load");
@@ -213,7 +218,7 @@ export const branch_subs = function(nura) {
},
subscribe: (run, invalid) => {
- let already_subscribed = all_branches[nura];
+ let already_subscribed = all_branches[nuri];
if (!already_subscribed) {
const { subscribe, set, update } = writable([]); // create the underlying writable store
let count = 0;
@@ -230,7 +235,7 @@ export const branch_subs = function(nura) {
unsub();
unsub = () => {};
set([]);
- unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_private_subscribe(),
+ unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_repo_subscribe(nuri),
async (commit) => {
//console.log("GOT APP RESPONSE", commit);
update( (old) => {old.unshift(commit); return old;} )
@@ -253,11 +258,16 @@ export const branch_subs = function(nura) {
// if (count == 0) {
// unsub();
// console.log("removed sub");
- // delete all_branches[nura];
+ // delete all_branches[nuri];
// }
},
+ unsubscribe: () => {
+ unsub();
+ console.log("unsubscribed ",nuri);
+ delete all_branches[nuri];
+ }
}
- all_branches[nura] = already_subscribed;
+ all_branches[nuri] = already_subscribed;
}
let new_store = already_subscribed.increase();
diff --git a/ng-app/src/wallet_emojis.ts b/ng-app/src/wallet_emojis.ts
index 1336f1f..47dd420 100644
--- a/ng-app/src/wallet_emojis.ts
+++ b/ng-app/src/wallet_emojis.ts
@@ -443,7 +443,7 @@ let face = [
{
hexcode: "1f6a3",
shortcode: "person_rowing_boat",
- code: "boat",
+ code: "rowing_boat",
},
{
hexcode: "1f3ca",
diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs
index 4c7d348..6c2930b 100644
--- a/ng-broker/src/rocksdb_server_storage.rs
+++ b/ng-broker/src/rocksdb_server_storage.rs
@@ -260,6 +260,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut model)?,
publisher,
topic,
+ TopicStorage::COMMITS_NBR.get(&mut model)?,
)),
}
}
@@ -351,6 +352,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?,
true,
*topic_id,
+ TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
),
);
}
@@ -372,6 +374,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?,
false,
*topic,
+ TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
));
}
result.extend(rw_topics_added.into_values());
@@ -401,6 +404,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?,
false,
*topic,
+ TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
));
}
}
@@ -478,6 +482,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?,
is_publisher,
*topic,
+ TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
))
}
@@ -640,6 +645,8 @@ impl RocksDbServerStorage {
let head = HashSet::from([commit_id]);
//TODO: current_heads in TopicInfo in ServerBroker is not updated (but it isn't used so far)
TopicStorage::HEADS.remove_from_set_and_add(&mut topic_storage, past, head)?;
+
+ TopicStorage::COMMITS_NBR.increment(&mut topic_storage)?;
}
}
@@ -652,6 +659,7 @@ impl RocksDbServerStorage {
topic: &TopicId,
known_heads: &Vec,
target_heads: &Vec,
+ known_commits: &Option,
) -> Result, ServerError> {
let overlay = self.check_overlay(overlay)?;
// quick solution for now using the Branch::sync_req. TODO: use the saved references (ACKS,DEPS) in the server_storage, to have much quicker responses
@@ -670,7 +678,7 @@ impl RocksDbServerStorage {
let store = Store::new_from_overlay_id(&overlay, Arc::clone(&self.block_storage));
- let commits = Branch::sync_req(target_heads, known_heads, &store)
+ let commits = Branch::sync_req(target_heads, known_heads, known_commits, &store)
.map_err(|_| ServerError::MalformedBranch)?;
let mut result = Vec::with_capacity(commits.len());
diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs
index 2056cf3..a70b285 100644
--- a/ng-broker/src/server_broker.rs
+++ b/ng-broker/src/server_broker.rs
@@ -312,6 +312,19 @@ impl IServerBroker for ServerBroker {
self.storage.get_commit(overlay, id)
}
+ fn remove_all_subscriptions_of_peer(&mut self, remote_peer: &PubKey) {
+ for ((overlay, topic), peers) in self.local_subscriptions.iter_mut() {
+ if peers.remove(remote_peer) {
+ log_debug!(
+ "subscription of peer {} to topic {} in overlay {} removed",
+ remote_peer,
+ topic,
+ overlay
+ );
+ }
+ }
+ }
+
fn dispatch_event(
&self,
overlay: &OverlayId,
@@ -344,8 +357,9 @@ impl IServerBroker for ServerBroker {
topic: &TopicId,
known_heads: &Vec,
target_heads: &Vec,
+ known_commits: &Option,
) -> Result, ServerError> {
self.storage
- .topic_sync_req(overlay, topic, known_heads, target_heads)
+ .topic_sync_req(overlay, topic, known_heads, target_heads, known_commits)
}
}
diff --git a/ng-broker/src/server_storage/core/topic.rs b/ng-broker/src/server_storage/core/topic.rs
index a8264c4..ddc4a2a 100644
--- a/ng-broker/src/server_storage/core/topic.rs
+++ b/ng-broker/src/server_storage/core/topic.rs
@@ -53,6 +53,7 @@ impl<'a> TopicStorage<'a> {
pub const ADVERT: SingleValueColumn = SingleValueColumn::new(b'a');
pub const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r');
pub const ROOT_COMMIT: SingleValueColumn = SingleValueColumn::new(b'o');
+ pub const COMMITS_NBR: CounterValue = CounterValue::new(b'n');
// Topic <-> Users who pinned it (with boolean: R or W)
pub const USERS: MultiMapColumn = MultiMapColumn::new(b'u');
@@ -63,7 +64,11 @@ impl<'a> TopicStorage<'a> {
"Topic",
Some(Self::PREFIX),
Some(&Self::REPO),
- &[&Self::ADVERT as &dyn ISingleValueColumn, &Self::ROOT_COMMIT],
+ &[
+ &Self::ADVERT as &dyn ISingleValueColumn,
+ &Self::ROOT_COMMIT,
+ &Self::COMMITS_NBR,
+ ],
&[&Self::USERS as &dyn IMultiValueColumn, &Self::HEADS],
);
diff --git a/ng-net/src/actors/client/event.rs b/ng-net/src/actors/client/event.rs
index 154ebcd..90c86f9 100644
--- a/ng-net/src/actors/client/event.rs
+++ b/ng-net/src/actors/client/event.rs
@@ -82,7 +82,6 @@ impl EActor for Actor<'_, PublishEvent, ()> {
// send a ProtocolError if invalid signatures (will disconnect the client)
req.event().verify()?;
- let broker = BROKER.read().await;
let overlay = req.overlay().clone();
let (user_id, remote_peer) = {
let fsm = _fsm.lock().await;
@@ -91,10 +90,12 @@ impl EActor for Actor<'_, PublishEvent, ()> {
fsm.remote_peer().ok_or(ProtocolError::ActorError)?,
)
};
- let res = broker
- .dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer)
- .await;
-
+ let res = {
+ let mut broker = BROKER.write().await;
+ broker
+ .dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer)
+ .await
+ };
_fsm.lock()
.await
.send_in_reply_to(res.into(), self.id())
diff --git a/ng-net/src/actors/client/topic_sync_req.rs b/ng-net/src/actors/client/topic_sync_req.rs
index c11de9f..97870a5 100644
--- a/ng-net/src/actors/client/topic_sync_req.rs
+++ b/ng-net/src/actors/client/topic_sync_req.rs
@@ -34,6 +34,7 @@ impl TopicSyncReq {
known_heads: vec![],
target_heads: vec![],
overlay: Some(*overlay),
+ known_commits: None,
})
}
@@ -42,12 +43,14 @@ impl TopicSyncReq {
topic_id: &TopicId,
known_heads: Vec,
target_heads: Vec,
+ known_commits: Option,
) -> TopicSyncReq {
TopicSyncReq::V0(TopicSyncReqV0 {
topic: *topic_id,
known_heads,
target_heads,
overlay: Some(repo.store.get_store_repo().overlay_id_for_read_purpose()),
+ known_commits,
})
}
}
@@ -111,6 +114,7 @@ impl EActor for Actor<'_, TopicSyncReq, TopicSyncRes> {
req.topic(),
req.known_heads(),
req.target_heads(),
+ req.known_commits(),
);
// IF NEEDED, the topic_sync_req could be changed to return a stream, and then the send_in_reply_to would be also totally async
diff --git a/ng-net/src/broker.rs b/ng-net/src/broker.rs
index bcb0d24..f1a0baf 100644
--- a/ng-net/src/broker.rs
+++ b/ng-net/src/broker.rs
@@ -336,6 +336,11 @@ impl Broker {
let _ = self.remove_user_peer(&user, &peer_id);
}
}
+ let peer = PubKey::X25519PubKey(peer_id);
+ log_debug!("unsubscribing peer {}", peer);
+ self.get_server_broker_mut()
+ .unwrap()
+ .remove_all_subscriptions_of_peer(&peer);
}
}
PeerConnection::Core(ip) => {
@@ -834,7 +839,7 @@ impl Broker {
local_broker.write().await.user_disconnected(user).await;
}
} else {
- log_info!("REMOVED");
+ log_debug!("REMOVED");
BROKER
.write()
.await
@@ -879,7 +884,7 @@ impl Broker {
#[cfg(not(target_arch = "wasm32"))]
pub async fn dispatch_event(
- &self,
+ &mut self,
overlay: &OverlayId,
event: Event,
user_id: &UserId,
@@ -887,12 +892,12 @@ impl Broker {
) -> Result<(), ServerError> {
// TODO: deal with subscriptions on the outer overlay. for now we assume everything is on the inner overlay
- let peers_for_local_dispatch = self.get_server_broker()?.dispatch_event(
- overlay,
- event.clone(),
- user_id,
- remote_peer,
- )?;
+ let peers_for_local_dispatch: Vec = self
+ .get_server_broker()?
+ .dispatch_event(overlay, event.clone(), user_id, remote_peer)?
+ .into_iter()
+ .cloned()
+ .collect();
//log_debug!("dispatch_event {:?}", peers_for_local_dispatch);
@@ -901,7 +906,7 @@ impl Broker {
if let Some(BrokerPeerInfo {
connected: PeerConnection::Client(ConnectionBase { fsm: Some(fsm), .. }),
..
- }) = self.peers.get(&(None, peer.to_owned().to_dh()))
+ }) = self.peers.get(&(None, peer.to_dh()))
{
//log_debug!("ForwardedEvent peer {:?}", peer);
let _ = fsm
@@ -915,6 +920,10 @@ impl Broker {
},
)))
.await;
+ } else {
+ // we remove the peer from all local_subscriptions
+ self.get_server_broker_mut()?
+ .remove_all_subscriptions_of_peer(&peer);
}
}
diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs
index e9970e7..43f8340 100644
--- a/ng-net/src/server_broker.rs
+++ b/ng-net/src/server_broker.rs
@@ -91,11 +91,14 @@ pub trait IServerBroker: Send + Sync {
remote_peer: &PubKey,
) -> Result, ServerError>;
+ fn remove_all_subscriptions_of_peer(&mut self, remote_peer: &PubKey);
+
fn topic_sync_req(
&self,
overlay: &OverlayId,
topic: &TopicId,
known_heads: &Vec,
target_heads: &Vec,
+ known_commits: &Option,
) -> Result, ServerError>;
}
diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs
index c5e1296..cd69217 100644
--- a/ng-net/src/types.rs
+++ b/ng-net/src/types.rs
@@ -1716,6 +1716,9 @@ pub struct TopicSyncReqV0 {
/// if empty, the local HEAD at the responder is used instead
pub target_heads: Vec,
+ /// optional Bloom filter of all the commit IDs present locally (used in case of detected fork)
+ pub known_commits: Option,
+
#[serde(skip)]
pub overlay: Option,
}
@@ -1752,6 +1755,11 @@ impl TopicSyncReq {
TopicSyncReq::V0(o) => &o.target_heads,
}
}
+ pub fn known_commits(&self) -> &Option {
+ match self {
+ TopicSyncReq::V0(o) => &o.known_commits,
+ }
+ }
}
/// Status of a Forwarded Peer, sent in the Advert
@@ -3380,6 +3388,7 @@ pub struct TopicSubResV0 {
pub topic: TopicId,
pub known_heads: Vec,
pub publisher: bool,
+ pub commits_nbr: u64,
}
/// Topic subscription response
@@ -3401,11 +3410,17 @@ impl TopicSubRes {
Self::V0(v0) => v0.publisher,
}
}
- pub fn new_from_heads(topics: HashSet, publisher: bool, topic: TopicId) -> Self {
+ pub fn new_from_heads(
+ topics: HashSet,
+ publisher: bool,
+ topic: TopicId,
+ commits_nbr: u64,
+ ) -> Self {
TopicSubRes::V0(TopicSubResV0 {
topic,
known_heads: topics.into_iter().collect(),
publisher,
+ commits_nbr,
})
}
pub fn known_heads(&self) -> &Vec {
@@ -3413,6 +3428,11 @@ impl TopicSubRes {
Self::V0(v0) => &v0.known_heads,
}
}
+ pub fn commits_nbr(&self) -> u64 {
+ match self {
+ Self::V0(v0) => v0.commits_nbr,
+ }
+ }
}
impl From for TopicSubRes {
@@ -3421,6 +3441,7 @@ impl From for TopicSubRes {
topic,
known_heads: vec![],
publisher: false,
+ commits_nbr: 0,
})
}
}
@@ -3431,6 +3452,7 @@ impl From for TopicSubRes {
topic: topic.topic_id().clone(),
known_heads: vec![],
publisher: true,
+ commits_nbr: 0,
})
}
}
diff --git a/ng-repo/src/branch.rs b/ng-repo/src/branch.rs
index d3b24f5..5380ab7 100644
--- a/ng-repo/src/branch.rs
+++ b/ng-repo/src/branch.rs
@@ -13,8 +13,8 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
+use fastbloom_rs::{BloomFilter as Filter, Membership};
use zeroize::Zeroize;
-// use fastbloom_rs::{BloomFilter as Filter, Membership};
use crate::errors::*;
#[allow(unused_imports)]
@@ -165,12 +165,20 @@ impl Branch {
missing: &mut Option<&mut HashSet>,
future: Option,
theirs_found: &mut Option<&mut HashSet>,
+ theirs_filter: &Option,
) -> Result<(), ObjectParseError> {
let id = cobj.id();
// check if this commit object is present in theirs or has already been visited in the current walk
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
- if !theirs.contains(&id) {
+
+ let found_in_filter = if let Some(filter) = theirs_filter {
+ filter.contains(id.slice())
+ } else {
+ false
+ };
+
+ if !found_in_filter && !theirs.contains(&id) {
if let Some(past) = visited.get_mut(&id) {
// we update the future
if let Some(f) = future {
@@ -193,6 +201,7 @@ impl Branch {
missing,
Some(id),
theirs_found,
+ theirs_filter,
)?;
}
Err(ObjectParseError::MissingBlocks(blocks)) => {
@@ -219,13 +228,9 @@ impl Branch {
pub fn sync_req(
target_heads: impl Iterator- ,
known_heads: &[ObjectId],
- //their_filter: &BloomFilter,
+ known_commits: &Option,
store: &Store,
) -> Result, ObjectParseError> {
- //log_debug!(">> sync_req");
- //log_debug!(" target_heads: {:?}", target_heads);
- //log_debug!(" known_heads: {:?}", known_heads);
-
// their commits
let mut theirs: HashMap = HashMap::new();
@@ -240,6 +245,7 @@ impl Branch {
&mut None,
None,
&mut None,
+ &None,
)?;
}
// we silently discard any load error on the known_heads as the responder might not know them (yet).
@@ -249,6 +255,10 @@ impl Branch {
let theirs: HashSet = theirs.keys().into_iter().cloned().collect();
+ let filter = known_commits
+ .as_ref()
+ .map(|their_filter| Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k));
+
// collect all commits reachable from target_heads
// up to the root or until encountering a commit from theirs
for id in target_heads {
@@ -261,27 +271,12 @@ impl Branch {
&mut None,
None,
&mut None,
+ &filter,
)?;
}
// we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
}
- //log_debug!("!! ours: {:?}", ours);
- //log_debug!("!! theirs: {:?}", theirs);
-
- // remove their_commits from result
- // let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into());
- // for id in result.clone() {
- // match id {
- // Digest::Blake3Digest32(d) => {
- // if filter.contains(&d) {
- // result.remove(&id);
- // }
- // }
- // }
- // }
- //log_debug!("!! result filtered: {:?}", result);
-
// now ordering to respect causal partial order.
let mut next_generations = HashSet::new();
for (_, node) in visited.iter() {
@@ -297,6 +292,11 @@ impl Branch {
result.append(&mut DagNode::collapse(first, &visited));
}
+ // #[cfg(debug_assertions)]
+ // for _res in result.iter() {
+ // log_debug!("sending missing commit {}", _res);
+ // }
+
Ok(result)
}
}
@@ -574,7 +574,7 @@ mod test {
let ids = Branch::sync_req(
[t5.id, a6.id, a7.id].into_iter(),
&[t5.id],
- //&their_commits,
+ &None,
&repo.store,
)
.unwrap();
diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs
index 178964c..623ce50 100644
--- a/ng-repo/src/errors.rs
+++ b/ng-repo/src/errors.rs
@@ -76,6 +76,7 @@ pub enum NgError {
WrongUploadId,
FileError(FileError),
InternalError,
+ OxiGraphError(String),
}
impl Error for NgError {}
diff --git a/ng-repo/src/kcv_storage.rs b/ng-repo/src/kcv_storage.rs
index 170042c..ba3efc4 100644
--- a/ng-repo/src/kcv_storage.rs
+++ b/ng-repo/src/kcv_storage.rs
@@ -674,22 +674,6 @@ pub trait IMultiValueColumn {
fn value_size(&self) -> Result;
}
-pub struct ExistentialValueColumn {
- suffix: u8,
-}
-
-impl ISingleValueColumn for ExistentialValueColumn {
- fn suffix(&self) -> u8 {
- self.suffix
- }
-}
-
-impl ExistentialValueColumn {
- pub const fn new(suffix: u8) -> Self {
- ExistentialValueColumn { suffix }
- }
-}
-
pub struct SingleValueColumn Deserialize<'a>> {
suffix: u8,
phantom_value: PhantomData,
@@ -779,6 +763,108 @@ impl Deserialize<'d>>
}
}
+///////////// Counter Value
+
+pub struct CounterValue {
+ suffix: u8,
+ phantom_model: PhantomData,
+}
+
+impl ISingleValueColumn for CounterValue {
+ fn suffix(&self) -> u8 {
+ self.suffix
+ }
+}
+
+impl CounterValue {
+ pub const fn new(suffix: u8) -> Self {
+ CounterValue {
+ suffix,
+ phantom_model: PhantomData,
+ }
+ }
+
+ pub fn increment(&self, model: &mut Model) -> Result<(), StorageError> {
+ model.storage().write_transaction(&mut |tx| {
+ let mut val: u64 = match tx.get(model.prefix(), model.key(), Some(self.suffix), &None) {
+ Ok(val_ser) => from_slice(&val_ser)?,
+ Err(StorageError::NotFound) => 0,
+ Err(e) => return Err(e),
+ };
+ val += 1;
+ let val_ser = to_vec(&val)?;
+ tx.put(
+ model.prefix(),
+ model.key(),
+ Some(self.suffix),
+ &val_ser,
+ &None,
+ )?;
+ Ok(())
+ })
+ }
+ /// returns true if the counter reached zero, and the property was removed
+ pub fn decrement(&self, model: &mut Model) -> Result {
+ let mut ret: bool = false;
+ model.storage().write_transaction(&mut |tx| {
+ let val_ser = tx.get(model.prefix(), model.key(), Some(self.suffix), &None)?;
+ let mut val: u64 = from_slice(&val_ser)?;
+ val -= 1;
+ ret = val == 0;
+ if ret {
+ tx.del(model.prefix(), model.key(), Some(self.suffix), &None)?;
+ } else {
+ let val_ser = to_vec(&val)?;
+ tx.put(
+ model.prefix(),
+ model.key(),
+ Some(self.suffix),
+ &val_ser,
+ &None,
+ )?;
+ }
+ Ok(())
+ })?;
+ Ok(ret)
+ }
+
+ pub fn get(&self, model: &mut Model) -> Result {
+ let val_res = model
+ .storage()
+ .get(model.prefix(), model.key(), Some(self.suffix), &None);
+ match val_res {
+ Ok(val_ser) => Ok(from_slice(&val_ser)?),
+ Err(StorageError::NotFound) => Ok(0),
+ Err(e) => Err(e),
+ }
+ }
+
+ pub fn del(&self, model: &mut Model) -> Result<(), StorageError> {
+ model.check_exists()?;
+ model
+ .storage()
+ .del(model.prefix(), model.key(), Some(self.suffix), &None)
+ }
+}
+
+////////////////
+
+pub struct ExistentialValueColumn {
+ suffix: u8,
+}
+
+impl ISingleValueColumn for ExistentialValueColumn {
+ fn suffix(&self) -> u8 {
+ self.suffix
+ }
+}
+
+impl ExistentialValueColumn {
+ pub const fn new(suffix: u8) -> Self {
+ ExistentialValueColumn { suffix }
+ }
+}
+
pub struct ExistentialValue Deserialize<'d>> {
value: Option,
value_ser: Vec,
diff --git a/ng-repo/src/repo.rs b/ng-repo/src/repo.rs
index b9c2b0b..b0d9b57 100644
--- a/ng-repo/src/repo.rs
+++ b/ng-repo/src/repo.rs
@@ -98,6 +98,8 @@ pub struct BranchInfo {
pub read_cap: ReadCap,
pub current_heads: Vec,
+
+ pub commits_nbr: u64,
}
/// In memory Repository representation. With helper functions that access the underlying UserStore and keeps proxy of the values
@@ -173,6 +175,7 @@ impl Repo {
}
branch.current_heads = set.into_iter().cloned().collect();
branch.current_heads.push(commit_ref);
+ branch.commits_nbr += 1;
// we return the new current heads
Ok(branch.current_heads.to_vec())
} else {
diff --git a/ng-repo/src/store.rs b/ng-repo/src/store.rs
index c55400e..9329a70 100644
--- a/ng-repo/src/store.rs
+++ b/ng-repo/src/store.rs
@@ -256,6 +256,7 @@ impl Store {
topic_priv_key: Some(branch_topic_priv_key),
read_cap: branch_read_cap,
current_heads: vec![],
+ commits_nbr: 0,
};
Ok((branch_commit, add_branch_commit, branch_info))
@@ -609,6 +610,7 @@ impl Store {
topic_priv_key: Some(topic_priv_key),
read_cap: root_branch_readcap.clone(),
current_heads: vec![sync_sig_on_root_branch_commit_ref],
+ commits_nbr: 0,
};
branches.push((root_branch.id, root_branch));
diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs
index 62cb24b..e97c87b 100644
--- a/ng-repo/src/types.rs
+++ b/ng-repo/src/types.rs
@@ -43,6 +43,11 @@ impl Digest {
pub fn from_slice(slice: [u8; 32]) -> Digest {
Digest::Blake3Digest32(slice)
}
+ pub fn slice(&self) -> &[u8; 32] {
+ match self {
+ Self::Blake3Digest32(o) => o,
+ }
+ }
}
impl fmt::Display for Digest {
diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs
index 57d9876..d0abb67 100644
--- a/ng-sdk-js/src/lib.rs
+++ b/ng-sdk-js/src/lib.rs
@@ -585,6 +585,17 @@ pub async fn doc_fetch_private_subscribe() -> Result {
Ok(serde_wasm_bindgen::to_value(&request).unwrap())
}
+#[cfg(target_arch = "wasm32")]
+#[wasm_bindgen]
+pub async fn doc_fetch_repo_subscribe(repo_id: String) -> Result {
+ let request = AppRequest::V0(AppRequestV0 {
+ command: AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)),
+ nuri: NuriV0::new_repo_target_from_string(repo_id).map_err(|e| e.to_string())?,
+ payload: None,
+ });
+ Ok(serde_wasm_bindgen::to_value(&request).unwrap())
+}
+
// #[cfg(target_arch = "wasm32")]
// #[wasm_bindgen]
// pub async fn get_readcap() -> Result {
diff --git a/ng-verifier/Cargo.toml b/ng-verifier/Cargo.toml
index 87a5107..edf9489 100644
--- a/ng-verifier/Cargo.toml
+++ b/ng-verifier/Cargo.toml
@@ -36,6 +36,7 @@ web-time = "0.2.0"
either = "1.8.1"
futures = "0.3.24"
async-trait = "0.1.64"
+fastbloom-rs = "0.5.3"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" }
diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs
index 3d30eb1..46d7ff4 100644
--- a/ng-verifier/src/commits/mod.rs
+++ b/ng-verifier/src/commits/mod.rs
@@ -111,6 +111,7 @@ impl CommitVerifier for RootBranch {
topic_priv_key,
read_cap: reference.clone(),
current_heads: vec![reference.clone()],
+ commits_nbr: 1,
};
let id = root_branch.id;
let branches = vec![(root_branch.id, root_branch)];
@@ -251,6 +252,7 @@ impl CommitVerifier for AddBranch {
topic_priv_key: None,
read_cap: v0.branch_read_cap.clone(),
current_heads: vec![],
+ commits_nbr: 0,
};
verifier.add_branch_and_save(
diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs
index 05b4c0b..7dfa48d 100644
--- a/ng-verifier/src/request_processor.rs
+++ b/ng-verifier/src/request_processor.rs
@@ -130,6 +130,14 @@ impl AppRequestCommandV0 {
};
Ok((repo_id, branch, store_repo))
}
+ NuriTargetV0::Repo(repo_id) => {
+ let (branch, store_repo) = {
+ let repo = verifier.repos.get(repo_id).ok_or(NgError::RepoNotFound)?;
+ let branch = repo.main_branch().ok_or(NgError::BranchNotFound)?;
+ (branch.id, repo.store.get_store_repo().clone())
+ };
+ Ok((*repo_id, branch, store_repo))
+ }
_ => unimplemented!(),
}
}
diff --git a/ng-verifier/src/types.rs b/ng-verifier/src/types.rs
index bbb86f2..0ca4358 100644
--- a/ng-verifier/src/types.rs
+++ b/ng-verifier/src/types.rs
@@ -221,6 +221,20 @@ pub struct NuriV0 {
}
impl NuriV0 {
+ pub fn new_repo_target_from_string(repo_id_string: String) -> Result {
+ let repo_id: RepoId = repo_id_string.as_str().try_into()?;
+ Ok(Self {
+ target: NuriTargetV0::Repo(repo_id),
+ entire_store: false,
+ object: None,
+ branch: None,
+ overlay: None,
+ access: vec![],
+ topic: None,
+ locator: vec![],
+ })
+ }
+
pub fn new_private_store_target() -> Self {
Self {
target: NuriTargetV0::PrivateStore,
diff --git a/ng-verifier/src/user_storage/branch.rs b/ng-verifier/src/user_storage/branch.rs
index 0170119..c851821 100644
--- a/ng-verifier/src/user_storage/branch.rs
+++ b/ng-verifier/src/user_storage/branch.rs
@@ -15,6 +15,8 @@ use serde_bare::to_vec;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::prop;
use ng_repo::kcv_storage::KCVStorage;
+#[allow(unused_imports)]
+use ng_repo::log::*;
use ng_repo::repo::BranchInfo;
use ng_repo::types::*;
@@ -33,8 +35,15 @@ impl<'a> BranchStorage<'a> {
const PUBLISHER: u8 = b'p';
const READ_CAP: u8 = b'r';
const TOPIC: u8 = b't';
+ const COMMITS_NBR: u8 = b'n';
- const ALL_PROPERTIES: [u8; 4] = [Self::TYPE, Self::PUBLISHER, Self::READ_CAP, Self::TOPIC];
+ const ALL_PROPERTIES: [u8; 5] = [
+ Self::TYPE,
+ Self::PUBLISHER,
+ Self::READ_CAP,
+ Self::TOPIC,
+ Self::COMMITS_NBR,
+ ];
const PREFIX_HEADS: u8 = b'h';
@@ -136,6 +145,7 @@ impl<'a> BranchStorage<'a> {
topic: prop(Self::TOPIC, &props)?,
topic_priv_key: prop(Self::PUBLISHER, &props).ok(),
current_heads: Self::get_all_heads(id, storage)?,
+ commits_nbr: prop(Self::COMMITS_NBR, &props).unwrap_or(0),
};
Ok(bs)
}
@@ -227,6 +237,28 @@ impl<'a> BranchStorage<'a> {
key.append(&mut head_ser);
tx.put(Self::PREFIX_HEADS, &key, None, &vec![], &None)?;
}
+
+ let mut val: u64 = match tx.get(Self::PREFIX, id_ser, Some(Self::COMMITS_NBR), &None) {
+ Ok(val_ser) => from_slice(&val_ser)?,
+ Err(StorageError::NotFound) => 0,
+ Err(e) => return Err(e),
+ };
+ val += 1;
+ let val_ser = to_vec(&val)?;
+ tx.put(
+ Self::PREFIX,
+ id_ser,
+ Some(Self::COMMITS_NBR),
+ &val_ser,
+ &None,
+ )?;
+ // log_info!(
+ // "putting commit_nbr {} {:?} {} {:?}",
+ // Self::PREFIX as char,
+ // id_ser,
+ // Self::COMMITS_NBR as char,
+ // val_ser
+ // );
Ok(())
})
}
diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs
index 09bc507..9930e28 100644
--- a/ng-verifier/src/verifier.rs
+++ b/ng-verifier/src/verifier.rs
@@ -21,6 +21,7 @@ use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard};
+use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Hashes, Membership};
use futures::channel::mpsc;
use futures::SinkExt;
use ng_repo::object::Object;
@@ -191,7 +192,7 @@ impl Verifier {
// );
if let Some(sender) = self.branch_subscriptions.get_mut(branch) {
if sender.is_closed() {
- //log_info!("closed so removed");
+ log_info!("closed so removed {}", branch);
self.branch_subscriptions.remove(branch);
} else {
let _ = sender.send(response).await;
@@ -227,7 +228,7 @@ impl Verifier {
}
let fnonce = Box::new(move || {
- //log_info!("CLOSE_CHANNEL");
+ log_info!("CLOSE_CHANNEL of subscription for branch {}", branch);
if !tx.is_closed() {
tx.close_channel();
}
@@ -739,7 +740,7 @@ impl Verifier {
.ok_or(NgError::TopicNotFound)?
.to_owned();
- self.update_branch_current_heads(&repo_id, &branch_id, past, commit_ref);
+ self.update_branch_current_heads(&repo_id, &branch_id, past, commit_ref)?;
if self.connected_server_id.is_some() {
// send the event to the server already
@@ -965,6 +966,7 @@ impl Verifier {
branch,
repo_id,
topic.known_heads(),
+ topic.commits_nbr(),
)
.await?;
break;
@@ -999,6 +1001,7 @@ impl Verifier {
branch,
repo_id,
topic.known_heads(),
+ topic.commits_nbr(),
)
.await?;
break;
@@ -1042,6 +1045,7 @@ impl Verifier {
branch,
repo_id,
sub.known_heads(),
+ sub.commits_nbr(),
)
.await?;
}
@@ -1167,7 +1171,7 @@ impl Verifier {
if res.is_ok() && !skip_heads_update {
let commit_ref = commit.reference().unwrap();
let past = commit.direct_causal_past();
- self.update_branch_current_heads(repo_id, branch_id, past, commit_ref);
+ self.update_branch_current_heads(repo_id, branch_id, past, commit_ref)?;
Ok(())
} else {
res
@@ -1291,8 +1295,14 @@ impl Verifier {
branch_id: &BranchId,
repo_id: &RepoId,
remote_heads: &Vec,
+ remote_commits_nbr: u64,
) -> Result<(), NgError> {
let (store, msg, branch_secret) = {
+ if remote_commits_nbr == 0 || remote_heads.is_empty() {
+ log_info!("branch is new on the broker. doing nothing");
+ return Ok(());
+ }
+
let repo = self.repos.get(repo_id).unwrap();
let branch_info = repo.branch(branch_id)?;
@@ -1302,10 +1312,7 @@ impl Verifier {
let ours_set: HashSet = HashSet::from_iter(ours.clone());
let theirs = HashSet::from_iter(remote_heads.clone().into_iter());
- if theirs.is_empty() {
- log_info!("branch is new on the broker. doing nothing");
- return Ok(());
- }
+
if ours_set.difference(&theirs).count() == 0
&& theirs.difference(&ours_set).count() == 0
{
@@ -1326,6 +1333,7 @@ impl Verifier {
&mut None,
None,
&mut Some(&mut theirs_found),
+ &None,
);
}
}
@@ -1333,16 +1341,33 @@ impl Verifier {
let theirs_not_found: Vec =
theirs.difference(&theirs_found).cloned().collect();
- if theirs_not_found.is_empty() {
+ let known_commits = if theirs_not_found.is_empty() {
return Ok(());
} else {
- // prepare bloom filter
- }
+ if visited.is_empty() {
+ None
+ } else {
+ // prepare bloom filter
+ let expected_elements =
+ remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr);
+ let mut config = FilterBuilder::new(expected_elements, 0.01);
+ config.enable_repeat_insert(false);
+ let mut filter = Filter::new(config);
+ for commit_id in visited.keys() {
+ filter.add(commit_id.slice());
+ }
+ Some(BloomFilter {
+ k: filter.hashes(),
+ f: filter.get_u8_array().to_vec(),
+ })
+ }
+ };
let msg = TopicSyncReq::V0(TopicSyncReqV0 {
topic: branch_info.topic,
known_heads: ours_set.union(&theirs_found).into_iter().cloned().collect(),
target_heads: theirs_not_found,
+ known_commits,
overlay: Some(store.overlay_for_read_on_client_protocol()),
});
(store, msg, branch_info.read_cap.key.clone())
@@ -1359,6 +1384,8 @@ impl Verifier {
.event()
.open(&store, repo_id, branch_id, &branch_secret)?;
+ // TODO: deal with missing commits in the DAG (fetch them individually with CommitGet). This can happen because of false positive on BloomFilter
+
self.verify_commit(&commit, branch_id, repo_id, Arc::clone(&store))
.await?;
}
@@ -1940,7 +1967,7 @@ impl Verifier {
// and have oxigraph use directly the UserStorage
Some(
oxigraph::store::Store::open_with_key(path_oxi, config.user_master_key)
- .unwrap(),
+ .map_err(|e| NgError::OxiGraphError(e.to_string()))?,
),
Some(Box::new(RocksDbUserStorage::open(
&path_user,
diff --git a/ng-wallet/src/emojis.rs b/ng-wallet/src/emojis.rs
index 48e425a..0f20ed7 100644
--- a/ng-wallet/src/emojis.rs
+++ b/ng-wallet/src/emojis.rs
@@ -427,7 +427,7 @@ const sport: [EmojiDef<'static>; 15] = [
EmojiDef {
hexcode: "1f6a3",
shortcode: "person_rowing_boat",
- code: "boat",
+ code: "rowing_boat",
},
EmojiDef {
hexcode: "1f3ca",
diff --git a/ng-wallet/src/types.rs b/ng-wallet/src/types.rs
index 72953bd..630b86d 100644
--- a/ng-wallet/src/types.rs
+++ b/ng-wallet/src/types.rs
@@ -133,6 +133,7 @@ impl SessionWalletStorageV0 {
pub struct SessionInfo {
pub session_id: u64,
pub user: UserId,
+ pub private_store_id: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]