bloomfilters for branch sync_req + improved disconnection and subscription handling

pull/19/head
Niko PLP 8 months ago
parent 481c4a96fd
commit 30c36dafca
  1. 1
      Cargo.lock
  2. 61
      nextgraph/src/local_broker.rs
  3. 11
      ng-app/src-tauri/src/lib.rs
  4. 4
      ng-app/src/App.svelte
  5. 3
      ng-app/src/api.ts
  6. 4
      ng-app/src/lib/FullLayout.svelte
  7. 4
      ng-app/src/lib/Login.svelte
  8. 41
      ng-app/src/lib/Test.svelte
  9. 30
      ng-app/src/routes/WalletLogin.svelte
  10. 24
      ng-app/src/store.ts
  11. 2
      ng-app/src/wallet_emojis.ts
  12. 10
      ng-broker/src/rocksdb_server_storage.rs
  13. 16
      ng-broker/src/server_broker.rs
  14. 7
      ng-broker/src/server_storage/core/topic.rs
  15. 11
      ng-net/src/actors/client/event.rs
  16. 4
      ng-net/src/actors/client/topic_sync_req.rs
  17. 27
      ng-net/src/broker.rs
  18. 3
      ng-net/src/server_broker.rs
  19. 24
      ng-net/src/types.rs
  20. 48
      ng-repo/src/branch.rs
  21. 1
      ng-repo/src/errors.rs
  22. 118
      ng-repo/src/kcv_storage.rs
  23. 3
      ng-repo/src/repo.rs
  24. 2
      ng-repo/src/store.rs
  25. 5
      ng-repo/src/types.rs
  26. 11
      ng-sdk-js/src/lib.rs
  27. 1
      ng-verifier/Cargo.toml
  28. 2
      ng-verifier/src/commits/mod.rs
  29. 8
      ng-verifier/src/request_processor.rs
  30. 14
      ng-verifier/src/types.rs
  31. 34
      ng-verifier/src/user_storage/branch.rs
  32. 51
      ng-verifier/src/verifier.rs
  33. 2
      ng-wallet/src/emojis.rs
  34. 1
      ng-wallet/src/types.rs

1
Cargo.lock generated

@ -3429,6 +3429,7 @@ dependencies = [
"blake3", "blake3",
"chacha20", "chacha20",
"either", "either",
"fastbloom-rs",
"futures", "futures",
"getrandom 0.2.10", "getrandom 0.2.10",
"ng-net", "ng-net",

@ -478,6 +478,21 @@ impl LocalBroker {
} }
} }
pub fn get_site_store_of_session(
&self,
session: &Session,
store_type: SiteStoreType,
) -> Result<PubKey, NgError> {
match self.opened_wallets.get(&session.config.wallet_name()) {
Some(opened_wallet) => {
let user_id = session.config.user_id();
let site = opened_wallet.wallet.site(&user_id)?;
Ok(site.get_site_store_id(store_type))
}
None => Err(NgError::WalletNotFound),
}
}
fn verifier_config_type_from_session_config( fn verifier_config_type_from_session_config(
&self, &self,
config: &SessionConfig, config: &SessionConfig,
@ -609,6 +624,8 @@ impl LocalBroker {
mut config: SessionConfig, mut config: SessionConfig,
user_priv_key: Option<PrivKey>, user_priv_key: Option<PrivKey>,
) -> Result<SessionInfo, NgError> { ) -> Result<SessionInfo, NgError> {
let intermediary_step = user_priv_key.is_some();
let broker = self; let broker = self;
let wallet_name: String = config.wallet_name(); let wallet_name: String = config.wallet_name();
@ -640,6 +657,9 @@ impl LocalBroker {
return Ok(SessionInfo { return Ok(SessionInfo {
session_id: *idx, session_id: *idx,
user: user_id, user: user_id,
private_store_id: broker
.get_site_store_of_session(sess, SiteStoreType::Private)?
.to_string(),
}); });
} }
} }
@ -807,19 +827,28 @@ impl LocalBroker {
//load verifier from local_storage (if rocks_db) //load verifier from local_storage (if rocks_db)
let _ = verifier.load(); let _ = verifier.load();
let session = Session {
broker.opened_sessions_list.push(Some(Session {
config, config,
peer_key: session.peer_key.clone(), peer_key: session.peer_key.clone(),
last_wallet_nonce: session.last_wallet_nonce, last_wallet_nonce: session.last_wallet_nonce,
verifier, verifier,
})); };
let private_store_id = if intermediary_step {
"".to_string()
} else {
broker
.get_site_store_of_session(&session, SiteStoreType::Private)?
.to_string()
};
broker.opened_sessions_list.push(Some(session));
let idx = broker.opened_sessions_list.len() - 1; let idx = broker.opened_sessions_list.len() - 1;
broker.opened_sessions.insert(user_id, idx as u64); broker.opened_sessions.insert(user_id, idx as u64);
Ok(SessionInfo { Ok(SessionInfo {
session_id: idx as u64, session_id: idx as u64,
user: user_id, user: user_id,
private_store_id,
}) })
} }
} }
@ -1031,7 +1060,7 @@ pub async fn wallet_create_v0(params: CreateWalletV0) -> Result<CreateWalletResu
intermediate.in_memory, intermediate.in_memory,
)?; )?;
let session_info = broker let mut session_info = broker
.session_start(session_config, Some(intermediate.user_privkey.clone())) .session_start(session_config, Some(intermediate.user_privkey.clone()))
.await?; .await?;
@ -1053,6 +1082,16 @@ pub async fn wallet_create_v0(params: CreateWalletV0) -> Result<CreateWalletResu
.unwrap() .unwrap()
.wallet .wallet
.complete_with_site_and_brokers(site, brokers); .complete_with_site_and_brokers(site, brokers);
session_info.private_store_id = broker
.get_site_store_of_session(
broker.opened_sessions_list[session_info.session_id as usize]
.as_ref()
.unwrap(),
SiteStoreType::Private,
)?
.to_string();
res.session_id = session_info.session_id; res.session_id = session_info.session_id;
Ok(res) Ok(res)
} }
@ -1585,7 +1624,10 @@ pub async fn app_request_stream(
} }
/// retrieves the ID of one of the 3 stores of a the personal Site (3P: public, protected, or private) /// retrieves the ID of one of the 3 stores of a the personal Site (3P: public, protected, or private)
pub async fn personal_site_store(session_id: u64, store: SiteStoreType) -> Result<PubKey, NgError> { pub async fn personal_site_store(
session_id: u64,
store_type: SiteStoreType,
) -> Result<PubKey, NgError> {
let broker = match LOCAL_BROKER.get() { let broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized), None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized),
Some(Ok(broker)) => broker.read().await, Some(Ok(broker)) => broker.read().await,
@ -1597,14 +1639,7 @@ pub async fn personal_site_store(session_id: u64, store: SiteStoreType) -> Resul
.as_ref() .as_ref()
.ok_or(NgError::SessionNotFound)?; .ok_or(NgError::SessionNotFound)?;
match broker.opened_wallets.get(&session.config.wallet_name()) { broker.get_site_store_of_session(session, store_type)
Some(opened_wallet) => {
let user_id = session.config.user_id();
let site = opened_wallet.wallet.site(&user_id)?;
Ok(site.get_site_store_id(store))
}
None => Err(NgError::WalletNotFound),
}
} }
#[doc(hidden)] #[doc(hidden)]

@ -292,6 +292,16 @@ async fn doc_fetch_private_subscribe() -> Result<AppRequest, String> {
Ok(request) Ok(request)
} }
#[tauri::command(rename_all = "snake_case")]
async fn doc_fetch_repo_subscribe(repo_id: String) -> Result<AppRequest, String> {
let request = AppRequest::V0(AppRequestV0 {
command: AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)),
nuri: NuriV0::new_repo_target_from_string(repo_id).map_err(|e| e.to_string())?,
payload: None,
});
Ok(request)
}
#[tauri::command(rename_all = "snake_case")] #[tauri::command(rename_all = "snake_case")]
async fn app_request( async fn app_request(
session_id: u64, session_id: u64,
@ -499,6 +509,7 @@ impl AppBuilder {
user_disconnect, user_disconnect,
client_info_rust, client_info_rust,
doc_fetch_private_subscribe, doc_fetch_private_subscribe,
doc_fetch_repo_subscribe,
cancel_stream, cancel_stream,
app_request_stream, app_request_stream,
app_request, app_request,

@ -161,7 +161,7 @@
break; break;
case "opened": case "opened":
if (!$opened_wallets[event.data.wallet.id]) { if (!$opened_wallets[event.data.wallet.id]) {
await tick(); //await tick();
// console.log( // console.log(
// "ADDING TO OPENED", // "ADDING TO OPENED",
// event.data.wallet.id, // event.data.wallet.id,
@ -226,7 +226,7 @@
w[value.id] = value.wallet; w[value.id] = value.wallet;
return w; return w;
}); });
await tick(); //await tick();
//console.log("posting opened"); //console.log("posting opened");
wallet_channel.postMessage( wallet_channel.postMessage(
{ {

@ -33,7 +33,8 @@ const mapping = {
"user_disconnect": ["user_id"], "user_disconnect": ["user_id"],
"app_request": ["session_id","request"], "app_request": ["session_id","request"],
"test": [ ], "test": [ ],
"doc_fetch_private_subscribe": [] "doc_fetch_private_subscribe": [],
"doc_fetch_repo_subscribe": ["repo_id"],
} }

@ -200,4 +200,8 @@
.full-layout { .full-layout {
height: 100vh; height: 100vh;
} }
main {
overflow: hidden;
overflow-wrap: break-word;
}
</style> </style>

@ -159,9 +159,9 @@
const myWorker = new worker_import.default(); const myWorker = new worker_import.default();
myWorker.onerror = (e) => { myWorker.onerror = (e) => {
console.error(e); console.error(e);
error = e; error = "WebWorker error";
step = "end"; step = "end";
dispatch("error", { error: e }); dispatch("error", { error });
}; };
myWorker.onmessageerror = (e) => { myWorker.onmessageerror = (e) => {
console.error(e); console.error(e);

@ -23,7 +23,7 @@
let is_tauri = import.meta.env.TAURI_PLATFORM; let is_tauri = import.meta.env.TAURI_PLATFORM;
let files = branch_subs("ok"); let files = $active_session && branch_subs($active_session.private_store_id);
let img_map = {}; let img_map = {};
@ -253,25 +253,26 @@
bind:this={fileinput} bind:this={fileinput}
/> />
</div> </div>
{#if files}
{#await files.load()}
<p>Currently loading...</p>
{:then}
{#each $files as file}
<p>
{file.V0.File.name}
{#await files.load()} {#await get_img(file.V0.File) then url}
<p>Currently loading...</p> {#if url}
{:then} <img
{#each $files as file} src={url}
<p> title={"did:ng" + file.V0.File.nuri}
{file.V0.File.name} alt={file.V0.File.name}
/>
{#await get_img(file.V0.File) then url} {/if}
{#if url} {/await}
<img </p>
src={url} {/each}
title={"did:ng" + file.V0.File.nuri} {/await}
alt={file.V0.File.name} {/if}
/>
{/if}
{/await}
</p>
{/each}
{/await}
{/if} {/if}
</div> </div>

@ -56,21 +56,29 @@
}); });
opened_wallets_unsub = opened_wallets.subscribe(async (value) => { opened_wallets_unsub = opened_wallets.subscribe(async (value) => {
if (!$active_wallet && selected && value[selected]) { if (!$active_wallet && selected && value[selected]) {
await tick(); //await tick();
active_wallet.set({ wallet: value[selected], id: selected }); active_wallet.set({ wallet: value[selected], id: selected });
} }
}); });
active_wallet_unsub = active_wallet.subscribe(async (value) => { active_wallet_unsub = active_wallet.subscribe(async (value) => {
if (value && value.wallet) { if (value && value.wallet) {
if (!$active_session) { if (!$active_session) {
let session = await ng.session_start( try {
value.id, let session = await ng.session_start(
value.wallet.V0.personal_site value.id,
); value.wallet.V0.personal_site
//console.log(session); );
if (session) { //console.log(session);
set_active_session(session); if (session) {
loggedin(); set_active_session(session);
loggedin();
}
} catch (e) {
error = e;
importing = false;
wallet = undefined;
selected = undefined;
active_wallet.set(undefined);
} }
} else { } else {
loggedin(); loggedin();
@ -137,7 +145,7 @@
let client = await ng.wallet_was_opened(event.detail.wallet); let client = await ng.wallet_was_opened(event.detail.wallet);
event.detail.wallet.V0.client = client; event.detail.wallet.V0.client = client;
} }
await tick(); //await tick();
active_wallet.set(event.detail); active_wallet.set(event.detail);
// { wallet, // { wallet,
// id } // id }
@ -381,7 +389,7 @@
position: absolute; position: absolute;
left: 0; left: 0;
padding: 5px; padding: 5px;
background-color: #ffffff70; background-color: #ffffffd0;
overflow-wrap: break-word; overflow-wrap: break-word;
} }
.wallet-box:focus .securitytxt { .wallet-box:focus .securitytxt {

@ -93,6 +93,11 @@ export const close_active_session = async function() {
active_session.set(undefined); active_session.set(undefined);
//console.log("setting active_session to undefined",get(active_session)); //console.log("setting active_session to undefined",get(active_session));
for (const branch of Object.keys(all_branches)) {
let sub = all_branches[branch];
sub.unsubscribe();
}
} }
const can_connect = derived([active_wallet, active_session], ([$s1, $s2]) => [ const can_connect = derived([active_wallet, active_session], ([$s1, $s2]) => [
@ -168,7 +173,7 @@ can_connect.subscribe(async (value) => {
} }
}); });
export const branch_subs = function(nura) { export const branch_subs = function(nuri) {
// console.log("branch_commits") // console.log("branch_commits")
// const { subscribe, set, update } = writable([]); // create the underlying writable store // const { subscribe, set, update } = writable([]); // create the underlying writable store
@ -176,7 +181,7 @@ export const branch_subs = function(nura) {
// return { // return {
// load: async () => { // load: async () => {
// console.log("load") // console.log("load")
// unsub = await ng.doc_sync_branch(nura, async (commit) => { // unsub = await ng.doc_sync_branch(nuri, async (commit) => {
// console.log(commit); // console.log(commit);
// update( (old) => {old.unshift(commit); return old;} ) // update( (old) => {old.unshift(commit); return old;} )
// }); // });
@ -201,7 +206,7 @@ export const branch_subs = function(nura) {
return { return {
load: async () => { load: async () => {
//console.log("load upper"); //console.log("load upper");
let already_subscribed = all_branches[nura]; let already_subscribed = all_branches[nuri];
if (!already_subscribed) return; if (!already_subscribed) return;
if (already_subscribed.load) { if (already_subscribed.load) {
//console.log("doing the load"); //console.log("doing the load");
@ -213,7 +218,7 @@ export const branch_subs = function(nura) {
}, },
subscribe: (run, invalid) => { subscribe: (run, invalid) => {
let already_subscribed = all_branches[nura]; let already_subscribed = all_branches[nuri];
if (!already_subscribed) { if (!already_subscribed) {
const { subscribe, set, update } = writable([]); // create the underlying writable store const { subscribe, set, update } = writable([]); // create the underlying writable store
let count = 0; let count = 0;
@ -230,7 +235,7 @@ export const branch_subs = function(nura) {
unsub(); unsub();
unsub = () => {}; unsub = () => {};
set([]); set([]);
unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_private_subscribe(), unsub = await ng.app_request_stream(session.session_id, await ng.doc_fetch_repo_subscribe(nuri),
async (commit) => { async (commit) => {
//console.log("GOT APP RESPONSE", commit); //console.log("GOT APP RESPONSE", commit);
update( (old) => {old.unshift(commit); return old;} ) update( (old) => {old.unshift(commit); return old;} )
@ -253,11 +258,16 @@ export const branch_subs = function(nura) {
// if (count == 0) { // if (count == 0) {
// unsub(); // unsub();
// console.log("removed sub"); // console.log("removed sub");
// delete all_branches[nura]; // delete all_branches[nuri];
// } // }
}, },
unsubscribe: () => {
unsub();
console.log("unsubscribed ",nuri);
delete all_branches[nuri];
}
} }
all_branches[nura] = already_subscribed; all_branches[nuri] = already_subscribed;
} }
let new_store = already_subscribed.increase(); let new_store = already_subscribed.increase();

@ -443,7 +443,7 @@ let face = [
{ {
hexcode: "1f6a3", hexcode: "1f6a3",
shortcode: "person_rowing_boat", shortcode: "person_rowing_boat",
code: "boat", code: "rowing_boat",
}, },
{ {
hexcode: "1f3ca", hexcode: "1f3ca",

@ -260,6 +260,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut model)?, TopicStorage::get_all_heads(&mut model)?,
publisher, publisher,
topic, topic,
TopicStorage::COMMITS_NBR.get(&mut model)?,
)), )),
} }
} }
@ -351,6 +352,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?, TopicStorage::get_all_heads(&mut topic_storage)?,
true, true,
*topic_id, *topic_id,
TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
), ),
); );
} }
@ -372,6 +374,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?, TopicStorage::get_all_heads(&mut topic_storage)?,
false, false,
*topic, *topic,
TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
)); ));
} }
result.extend(rw_topics_added.into_values()); result.extend(rw_topics_added.into_values());
@ -401,6 +404,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?, TopicStorage::get_all_heads(&mut topic_storage)?,
false, false,
*topic, *topic,
TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
)); ));
} }
} }
@ -478,6 +482,7 @@ impl RocksDbServerStorage {
TopicStorage::get_all_heads(&mut topic_storage)?, TopicStorage::get_all_heads(&mut topic_storage)?,
is_publisher, is_publisher,
*topic, *topic,
TopicStorage::COMMITS_NBR.get(&mut topic_storage)?,
)) ))
} }
@ -640,6 +645,8 @@ impl RocksDbServerStorage {
let head = HashSet::from([commit_id]); let head = HashSet::from([commit_id]);
//TODO: current_heads in TopicInfo in ServerBroker is not updated (but it isn't used so far) //TODO: current_heads in TopicInfo in ServerBroker is not updated (but it isn't used so far)
TopicStorage::HEADS.remove_from_set_and_add(&mut topic_storage, past, head)?; TopicStorage::HEADS.remove_from_set_and_add(&mut topic_storage, past, head)?;
TopicStorage::COMMITS_NBR.increment(&mut topic_storage)?;
} }
} }
@ -652,6 +659,7 @@ impl RocksDbServerStorage {
topic: &TopicId, topic: &TopicId,
known_heads: &Vec<ObjectId>, known_heads: &Vec<ObjectId>,
target_heads: &Vec<ObjectId>, target_heads: &Vec<ObjectId>,
known_commits: &Option<BloomFilter>,
) -> Result<Vec<TopicSyncRes>, ServerError> { ) -> Result<Vec<TopicSyncRes>, ServerError> {
let overlay = self.check_overlay(overlay)?; let overlay = self.check_overlay(overlay)?;
// quick solution for now using the Branch::sync_req. TODO: use the saved references (ACKS,DEPS) in the server_storage, to have much quicker responses // quick solution for now using the Branch::sync_req. TODO: use the saved references (ACKS,DEPS) in the server_storage, to have much quicker responses
@ -670,7 +678,7 @@ impl RocksDbServerStorage {
let store = Store::new_from_overlay_id(&overlay, Arc::clone(&self.block_storage)); let store = Store::new_from_overlay_id(&overlay, Arc::clone(&self.block_storage));
let commits = Branch::sync_req(target_heads, known_heads, &store) let commits = Branch::sync_req(target_heads, known_heads, known_commits, &store)
.map_err(|_| ServerError::MalformedBranch)?; .map_err(|_| ServerError::MalformedBranch)?;
let mut result = Vec::with_capacity(commits.len()); let mut result = Vec::with_capacity(commits.len());

@ -312,6 +312,19 @@ impl IServerBroker for ServerBroker {
self.storage.get_commit(overlay, id) self.storage.get_commit(overlay, id)
} }
fn remove_all_subscriptions_of_peer(&mut self, remote_peer: &PubKey) {
for ((overlay, topic), peers) in self.local_subscriptions.iter_mut() {
if peers.remove(remote_peer) {
log_debug!(
"subscription of peer {} to topic {} in overlay {} removed",
remote_peer,
topic,
overlay
);
}
}
}
fn dispatch_event( fn dispatch_event(
&self, &self,
overlay: &OverlayId, overlay: &OverlayId,
@ -344,8 +357,9 @@ impl IServerBroker for ServerBroker {
topic: &TopicId, topic: &TopicId,
known_heads: &Vec<ObjectId>, known_heads: &Vec<ObjectId>,
target_heads: &Vec<ObjectId>, target_heads: &Vec<ObjectId>,
known_commits: &Option<BloomFilter>,
) -> Result<Vec<TopicSyncRes>, ServerError> { ) -> Result<Vec<TopicSyncRes>, ServerError> {
self.storage self.storage
.topic_sync_req(overlay, topic, known_heads, target_heads) .topic_sync_req(overlay, topic, known_heads, target_heads, known_commits)
} }
} }

@ -53,6 +53,7 @@ impl<'a> TopicStorage<'a> {
pub const ADVERT: SingleValueColumn<Self, PublisherAdvert> = SingleValueColumn::new(b'a'); pub const ADVERT: SingleValueColumn<Self, PublisherAdvert> = SingleValueColumn::new(b'a');
pub const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r'); pub const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r');
pub const ROOT_COMMIT: SingleValueColumn<Self, ObjectId> = SingleValueColumn::new(b'o'); pub const ROOT_COMMIT: SingleValueColumn<Self, ObjectId> = SingleValueColumn::new(b'o');
pub const COMMITS_NBR: CounterValue<Self> = CounterValue::new(b'n');
// Topic <-> Users who pinned it (with boolean: R or W) // Topic <-> Users who pinned it (with boolean: R or W)
pub const USERS: MultiMapColumn<Self, UserId, bool> = MultiMapColumn::new(b'u'); pub const USERS: MultiMapColumn<Self, UserId, bool> = MultiMapColumn::new(b'u');
@ -63,7 +64,11 @@ impl<'a> TopicStorage<'a> {
"Topic", "Topic",
Some(Self::PREFIX), Some(Self::PREFIX),
Some(&Self::REPO), Some(&Self::REPO),
&[&Self::ADVERT as &dyn ISingleValueColumn, &Self::ROOT_COMMIT], &[
&Self::ADVERT as &dyn ISingleValueColumn,
&Self::ROOT_COMMIT,
&Self::COMMITS_NBR,
],
&[&Self::USERS as &dyn IMultiValueColumn, &Self::HEADS], &[&Self::USERS as &dyn IMultiValueColumn, &Self::HEADS],
); );

@ -82,7 +82,6 @@ impl EActor for Actor<'_, PublishEvent, ()> {
// send a ProtocolError if invalid signatures (will disconnect the client) // send a ProtocolError if invalid signatures (will disconnect the client)
req.event().verify()?; req.event().verify()?;
let broker = BROKER.read().await;
let overlay = req.overlay().clone(); let overlay = req.overlay().clone();
let (user_id, remote_peer) = { let (user_id, remote_peer) = {
let fsm = _fsm.lock().await; let fsm = _fsm.lock().await;
@ -91,10 +90,12 @@ impl EActor for Actor<'_, PublishEvent, ()> {
fsm.remote_peer().ok_or(ProtocolError::ActorError)?, fsm.remote_peer().ok_or(ProtocolError::ActorError)?,
) )
}; };
let res = broker let res = {
.dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer) let mut broker = BROKER.write().await;
.await; broker
.dispatch_event(&overlay, req.take_event(), &user_id, &remote_peer)
.await
};
_fsm.lock() _fsm.lock()
.await .await
.send_in_reply_to(res.into(), self.id()) .send_in_reply_to(res.into(), self.id())

@ -34,6 +34,7 @@ impl TopicSyncReq {
known_heads: vec![], known_heads: vec![],
target_heads: vec![], target_heads: vec![],
overlay: Some(*overlay), overlay: Some(*overlay),
known_commits: None,
}) })
} }
@ -42,12 +43,14 @@ impl TopicSyncReq {
topic_id: &TopicId, topic_id: &TopicId,
known_heads: Vec<ObjectId>, known_heads: Vec<ObjectId>,
target_heads: Vec<ObjectId>, target_heads: Vec<ObjectId>,
known_commits: Option<BloomFilter>,
) -> TopicSyncReq { ) -> TopicSyncReq {
TopicSyncReq::V0(TopicSyncReqV0 { TopicSyncReq::V0(TopicSyncReqV0 {
topic: *topic_id, topic: *topic_id,
known_heads, known_heads,
target_heads, target_heads,
overlay: Some(repo.store.get_store_repo().overlay_id_for_read_purpose()), overlay: Some(repo.store.get_store_repo().overlay_id_for_read_purpose()),
known_commits,
}) })
} }
} }
@ -111,6 +114,7 @@ impl EActor for Actor<'_, TopicSyncReq, TopicSyncRes> {
req.topic(), req.topic(),
req.known_heads(), req.known_heads(),
req.target_heads(), req.target_heads(),
req.known_commits(),
); );
// IF NEEDED, the topic_sync_req could be changed to return a stream, and then the send_in_reply_to would be also totally async // IF NEEDED, the topic_sync_req could be changed to return a stream, and then the send_in_reply_to would be also totally async

@ -336,6 +336,11 @@ impl Broker {
let _ = self.remove_user_peer(&user, &peer_id); let _ = self.remove_user_peer(&user, &peer_id);
} }
} }
let peer = PubKey::X25519PubKey(peer_id);
log_debug!("unsubscribing peer {}", peer);
self.get_server_broker_mut()
.unwrap()
.remove_all_subscriptions_of_peer(&peer);
} }
} }
PeerConnection::Core(ip) => { PeerConnection::Core(ip) => {
@ -834,7 +839,7 @@ impl Broker {
local_broker.write().await.user_disconnected(user).await; local_broker.write().await.user_disconnected(user).await;
} }
} else { } else {
log_info!("REMOVED"); log_debug!("REMOVED");
BROKER BROKER
.write() .write()
.await .await
@ -879,7 +884,7 @@ impl Broker {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub async fn dispatch_event( pub async fn dispatch_event(
&self, &mut self,
overlay: &OverlayId, overlay: &OverlayId,
event: Event, event: Event,
user_id: &UserId, user_id: &UserId,
@ -887,12 +892,12 @@ impl Broker {
) -> Result<(), ServerError> { ) -> Result<(), ServerError> {
// TODO: deal with subscriptions on the outer overlay. for now we assume everything is on the inner overlay // TODO: deal with subscriptions on the outer overlay. for now we assume everything is on the inner overlay
let peers_for_local_dispatch = self.get_server_broker()?.dispatch_event( let peers_for_local_dispatch: Vec<PubKey> = self
overlay, .get_server_broker()?
event.clone(), .dispatch_event(overlay, event.clone(), user_id, remote_peer)?
user_id, .into_iter()
remote_peer, .cloned()
)?; .collect();
//log_debug!("dispatch_event {:?}", peers_for_local_dispatch); //log_debug!("dispatch_event {:?}", peers_for_local_dispatch);
@ -901,7 +906,7 @@ impl Broker {
if let Some(BrokerPeerInfo { if let Some(BrokerPeerInfo {
connected: PeerConnection::Client(ConnectionBase { fsm: Some(fsm), .. }), connected: PeerConnection::Client(ConnectionBase { fsm: Some(fsm), .. }),
.. ..
}) = self.peers.get(&(None, peer.to_owned().to_dh())) }) = self.peers.get(&(None, peer.to_dh()))
{ {
//log_debug!("ForwardedEvent peer {:?}", peer); //log_debug!("ForwardedEvent peer {:?}", peer);
let _ = fsm let _ = fsm
@ -915,6 +920,10 @@ impl Broker {
}, },
))) )))
.await; .await;
} else {
// we remove the peer from all local_subscriptions
self.get_server_broker_mut()?
.remove_all_subscriptions_of_peer(&peer);
} }
} }

@ -91,11 +91,14 @@ pub trait IServerBroker: Send + Sync {
remote_peer: &PubKey, remote_peer: &PubKey,
) -> Result<HashSet<&PubKey>, ServerError>; ) -> Result<HashSet<&PubKey>, ServerError>;
fn remove_all_subscriptions_of_peer(&mut self, remote_peer: &PubKey);
fn topic_sync_req( fn topic_sync_req(
&self, &self,
overlay: &OverlayId, overlay: &OverlayId,
topic: &TopicId, topic: &TopicId,
known_heads: &Vec<ObjectId>, known_heads: &Vec<ObjectId>,
target_heads: &Vec<ObjectId>, target_heads: &Vec<ObjectId>,
known_commits: &Option<BloomFilter>,
) -> Result<Vec<TopicSyncRes>, ServerError>; ) -> Result<Vec<TopicSyncRes>, ServerError>;
} }

@ -1716,6 +1716,9 @@ pub struct TopicSyncReqV0 {
/// if empty, the local HEAD at the responder is used instead /// if empty, the local HEAD at the responder is used instead
pub target_heads: Vec<ObjectId>, pub target_heads: Vec<ObjectId>,
/// optional Bloom filter of all the commit IDs present locally (used in case of detected fork)
pub known_commits: Option<BloomFilter>,
#[serde(skip)] #[serde(skip)]
pub overlay: Option<OverlayId>, pub overlay: Option<OverlayId>,
} }
@ -1752,6 +1755,11 @@ impl TopicSyncReq {
TopicSyncReq::V0(o) => &o.target_heads, TopicSyncReq::V0(o) => &o.target_heads,
} }
} }
pub fn known_commits(&self) -> &Option<BloomFilter> {
match self {
TopicSyncReq::V0(o) => &o.known_commits,
}
}
} }
/// Status of a Forwarded Peer, sent in the Advert /// Status of a Forwarded Peer, sent in the Advert
@ -3380,6 +3388,7 @@ pub struct TopicSubResV0 {
pub topic: TopicId, pub topic: TopicId,
pub known_heads: Vec<ObjectId>, pub known_heads: Vec<ObjectId>,
pub publisher: bool, pub publisher: bool,
pub commits_nbr: u64,
} }
/// Topic subscription response /// Topic subscription response
@ -3401,11 +3410,17 @@ impl TopicSubRes {
Self::V0(v0) => v0.publisher, Self::V0(v0) => v0.publisher,
} }
} }
pub fn new_from_heads(topics: HashSet<ObjectId>, publisher: bool, topic: TopicId) -> Self { pub fn new_from_heads(
topics: HashSet<ObjectId>,
publisher: bool,
topic: TopicId,
commits_nbr: u64,
) -> Self {
TopicSubRes::V0(TopicSubResV0 { TopicSubRes::V0(TopicSubResV0 {
topic, topic,
known_heads: topics.into_iter().collect(), known_heads: topics.into_iter().collect(),
publisher, publisher,
commits_nbr,
}) })
} }
pub fn known_heads(&self) -> &Vec<ObjectId> { pub fn known_heads(&self) -> &Vec<ObjectId> {
@ -3413,6 +3428,11 @@ impl TopicSubRes {
Self::V0(v0) => &v0.known_heads, Self::V0(v0) => &v0.known_heads,
} }
} }
pub fn commits_nbr(&self) -> u64 {
match self {
Self::V0(v0) => v0.commits_nbr,
}
}
} }
impl From<TopicId> for TopicSubRes { impl From<TopicId> for TopicSubRes {
@ -3421,6 +3441,7 @@ impl From<TopicId> for TopicSubRes {
topic, topic,
known_heads: vec![], known_heads: vec![],
publisher: false, publisher: false,
commits_nbr: 0,
}) })
} }
} }
@ -3431,6 +3452,7 @@ impl From<PublisherAdvert> for TopicSubRes {
topic: topic.topic_id().clone(), topic: topic.topic_id().clone(),
known_heads: vec![], known_heads: vec![],
publisher: true, publisher: true,
commits_nbr: 0,
}) })
} }
} }

@ -13,8 +13,8 @@ use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use fastbloom_rs::{BloomFilter as Filter, Membership};
use zeroize::Zeroize; use zeroize::Zeroize;
// use fastbloom_rs::{BloomFilter as Filter, Membership};
use crate::errors::*; use crate::errors::*;
#[allow(unused_imports)] #[allow(unused_imports)]
@ -165,12 +165,20 @@ impl Branch {
missing: &mut Option<&mut HashSet<ObjectId>>, missing: &mut Option<&mut HashSet<ObjectId>>,
future: Option<ObjectId>, future: Option<ObjectId>,
theirs_found: &mut Option<&mut HashSet<ObjectId>>, theirs_found: &mut Option<&mut HashSet<ObjectId>>,
theirs_filter: &Option<Filter>,
) -> Result<(), ObjectParseError> { ) -> Result<(), ObjectParseError> {
let id = cobj.id(); let id = cobj.id();
// check if this commit object is present in theirs or has already been visited in the current walk // check if this commit object is present in theirs or has already been visited in the current walk
// load deps, stop at the root(including it in visited) or if this is a commit object from known_heads // load deps, stop at the root(including it in visited) or if this is a commit object from known_heads
if !theirs.contains(&id) {
let found_in_filter = if let Some(filter) = theirs_filter {
filter.contains(id.slice())
} else {
false
};
if !found_in_filter && !theirs.contains(&id) {
if let Some(past) = visited.get_mut(&id) { if let Some(past) = visited.get_mut(&id) {
// we update the future // we update the future
if let Some(f) = future { if let Some(f) = future {
@ -193,6 +201,7 @@ impl Branch {
missing, missing,
Some(id), Some(id),
theirs_found, theirs_found,
theirs_filter,
)?; )?;
} }
Err(ObjectParseError::MissingBlocks(blocks)) => { Err(ObjectParseError::MissingBlocks(blocks)) => {
@ -219,13 +228,9 @@ impl Branch {
pub fn sync_req( pub fn sync_req(
target_heads: impl Iterator<Item = ObjectId>, target_heads: impl Iterator<Item = ObjectId>,
known_heads: &[ObjectId], known_heads: &[ObjectId],
//their_filter: &BloomFilter, known_commits: &Option<BloomFilter>,
store: &Store, store: &Store,
) -> Result<Vec<ObjectId>, ObjectParseError> { ) -> Result<Vec<ObjectId>, ObjectParseError> {
//log_debug!(">> sync_req");
//log_debug!(" target_heads: {:?}", target_heads);
//log_debug!(" known_heads: {:?}", known_heads);
// their commits // their commits
let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new(); let mut theirs: HashMap<ObjectId, DagNode> = HashMap::new();
@ -240,6 +245,7 @@ impl Branch {
&mut None, &mut None,
None, None,
&mut None, &mut None,
&None,
)?; )?;
} }
// we silently discard any load error on the known_heads as the responder might not know them (yet). // we silently discard any load error on the known_heads as the responder might not know them (yet).
@ -249,6 +255,10 @@ impl Branch {
let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect(); let theirs: HashSet<ObjectId> = theirs.keys().into_iter().cloned().collect();
let filter = known_commits
.as_ref()
.map(|their_filter| Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k));
// collect all commits reachable from target_heads // collect all commits reachable from target_heads
// up to the root or until encountering a commit from theirs // up to the root or until encountering a commit from theirs
for id in target_heads { for id in target_heads {
@ -261,27 +271,12 @@ impl Branch {
&mut None, &mut None,
None, None,
&mut None, &mut None,
&filter,
)?; )?;
} }
// we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally. // we silently discard any load error on the target_heads as they can be wrong if the requester is confused about what the responder has locally.
} }
//log_debug!("!! ours: {:?}", ours);
//log_debug!("!! theirs: {:?}", theirs);
// remove their_commits from result
// let filter = Filter::from_u8_array(their_filter.f.as_slice(), their_filter.k.into());
// for id in result.clone() {
// match id {
// Digest::Blake3Digest32(d) => {
// if filter.contains(&d) {
// result.remove(&id);
// }
// }
// }
// }
//log_debug!("!! result filtered: {:?}", result);
// now ordering to respect causal partial order. // now ordering to respect causal partial order.
let mut next_generations = HashSet::new(); let mut next_generations = HashSet::new();
for (_, node) in visited.iter() { for (_, node) in visited.iter() {
@ -297,6 +292,11 @@ impl Branch {
result.append(&mut DagNode::collapse(first, &visited)); result.append(&mut DagNode::collapse(first, &visited));
} }
// #[cfg(debug_assertions)]
// for _res in result.iter() {
// log_debug!("sending missing commit {}", _res);
// }
Ok(result) Ok(result)
} }
} }
@ -574,7 +574,7 @@ mod test {
let ids = Branch::sync_req( let ids = Branch::sync_req(
[t5.id, a6.id, a7.id].into_iter(), [t5.id, a6.id, a7.id].into_iter(),
&[t5.id], &[t5.id],
//&their_commits, &None,
&repo.store, &repo.store,
) )
.unwrap(); .unwrap();

@ -76,6 +76,7 @@ pub enum NgError {
WrongUploadId, WrongUploadId,
FileError(FileError), FileError(FileError),
InternalError, InternalError,
OxiGraphError(String),
} }
impl Error for NgError {} impl Error for NgError {}

@ -674,22 +674,6 @@ pub trait IMultiValueColumn {
fn value_size(&self) -> Result<usize, StorageError>; fn value_size(&self) -> Result<usize, StorageError>;
} }
pub struct ExistentialValueColumn {
suffix: u8,
}
impl ISingleValueColumn for ExistentialValueColumn {
fn suffix(&self) -> u8 {
self.suffix
}
}
impl ExistentialValueColumn {
pub const fn new(suffix: u8) -> Self {
ExistentialValueColumn { suffix }
}
}
pub struct SingleValueColumn<Model: IModel, Value: Serialize + for<'a> Deserialize<'a>> { pub struct SingleValueColumn<Model: IModel, Value: Serialize + for<'a> Deserialize<'a>> {
suffix: u8, suffix: u8,
phantom_value: PhantomData<Value>, phantom_value: PhantomData<Value>,
@ -779,6 +763,108 @@ impl<Model: IModel, Value: Clone + Serialize + for<'d> Deserialize<'d>>
} }
} }
///////////// Counter Value
pub struct CounterValue<Model: IModel> {
suffix: u8,
phantom_model: PhantomData<Model>,
}
impl<Model: IModel> ISingleValueColumn for CounterValue<Model> {
fn suffix(&self) -> u8 {
self.suffix
}
}
impl<Model: IModel> CounterValue<Model> {
pub const fn new(suffix: u8) -> Self {
CounterValue {
suffix,
phantom_model: PhantomData,
}
}
pub fn increment(&self, model: &mut Model) -> Result<(), StorageError> {
model.storage().write_transaction(&mut |tx| {
let mut val: u64 = match tx.get(model.prefix(), model.key(), Some(self.suffix), &None) {
Ok(val_ser) => from_slice(&val_ser)?,
Err(StorageError::NotFound) => 0,
Err(e) => return Err(e),
};
val += 1;
let val_ser = to_vec(&val)?;
tx.put(
model.prefix(),
model.key(),
Some(self.suffix),
&val_ser,
&None,
)?;
Ok(())
})
}
/// returns true if the counter reached zero, and the property was removed
pub fn decrement(&self, model: &mut Model) -> Result<bool, StorageError> {
let mut ret: bool = false;
model.storage().write_transaction(&mut |tx| {
let val_ser = tx.get(model.prefix(), model.key(), Some(self.suffix), &None)?;
let mut val: u64 = from_slice(&val_ser)?;
val -= 1;
ret = val == 0;
if ret {
tx.del(model.prefix(), model.key(), Some(self.suffix), &None)?;
} else {
let val_ser = to_vec(&val)?;
tx.put(
model.prefix(),
model.key(),
Some(self.suffix),
&val_ser,
&None,
)?;
}
Ok(())
})?;
Ok(ret)
}
pub fn get(&self, model: &mut Model) -> Result<u64, StorageError> {
let val_res = model
.storage()
.get(model.prefix(), model.key(), Some(self.suffix), &None);
match val_res {
Ok(val_ser) => Ok(from_slice(&val_ser)?),
Err(StorageError::NotFound) => Ok(0),
Err(e) => Err(e),
}
}
pub fn del(&self, model: &mut Model) -> Result<(), StorageError> {
model.check_exists()?;
model
.storage()
.del(model.prefix(), model.key(), Some(self.suffix), &None)
}
}
////////////////
pub struct ExistentialValueColumn {
suffix: u8,
}
impl ISingleValueColumn for ExistentialValueColumn {
fn suffix(&self) -> u8 {
self.suffix
}
}
impl ExistentialValueColumn {
pub const fn new(suffix: u8) -> Self {
ExistentialValueColumn { suffix }
}
}
pub struct ExistentialValue<Column: Serialize + for<'d> Deserialize<'d>> { pub struct ExistentialValue<Column: Serialize + for<'d> Deserialize<'d>> {
value: Option<Column>, value: Option<Column>,
value_ser: Vec<u8>, value_ser: Vec<u8>,

@ -98,6 +98,8 @@ pub struct BranchInfo {
pub read_cap: ReadCap, pub read_cap: ReadCap,
pub current_heads: Vec<ObjectRef>, pub current_heads: Vec<ObjectRef>,
pub commits_nbr: u64,
} }
/// In memory Repository representation. With helper functions that access the underlying UserStore and keeps proxy of the values /// In memory Repository representation. With helper functions that access the underlying UserStore and keeps proxy of the values
@ -173,6 +175,7 @@ impl Repo {
} }
branch.current_heads = set.into_iter().cloned().collect(); branch.current_heads = set.into_iter().cloned().collect();
branch.current_heads.push(commit_ref); branch.current_heads.push(commit_ref);
branch.commits_nbr += 1;
// we return the new current heads // we return the new current heads
Ok(branch.current_heads.to_vec()) Ok(branch.current_heads.to_vec())
} else { } else {

@ -256,6 +256,7 @@ impl Store {
topic_priv_key: Some(branch_topic_priv_key), topic_priv_key: Some(branch_topic_priv_key),
read_cap: branch_read_cap, read_cap: branch_read_cap,
current_heads: vec![], current_heads: vec![],
commits_nbr: 0,
}; };
Ok((branch_commit, add_branch_commit, branch_info)) Ok((branch_commit, add_branch_commit, branch_info))
@ -609,6 +610,7 @@ impl Store {
topic_priv_key: Some(topic_priv_key), topic_priv_key: Some(topic_priv_key),
read_cap: root_branch_readcap.clone(), read_cap: root_branch_readcap.clone(),
current_heads: vec![sync_sig_on_root_branch_commit_ref], current_heads: vec![sync_sig_on_root_branch_commit_ref],
commits_nbr: 0,
}; };
branches.push((root_branch.id, root_branch)); branches.push((root_branch.id, root_branch));

@ -43,6 +43,11 @@ impl Digest {
pub fn from_slice(slice: [u8; 32]) -> Digest { pub fn from_slice(slice: [u8; 32]) -> Digest {
Digest::Blake3Digest32(slice) Digest::Blake3Digest32(slice)
} }
pub fn slice(&self) -> &[u8; 32] {
match self {
Self::Blake3Digest32(o) => o,
}
}
} }
impl fmt::Display for Digest { impl fmt::Display for Digest {

@ -585,6 +585,17 @@ pub async fn doc_fetch_private_subscribe() -> Result<JsValue, String> {
Ok(serde_wasm_bindgen::to_value(&request).unwrap()) Ok(serde_wasm_bindgen::to_value(&request).unwrap())
} }
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn doc_fetch_repo_subscribe(repo_id: String) -> Result<JsValue, String> {
let request = AppRequest::V0(AppRequestV0 {
command: AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)),
nuri: NuriV0::new_repo_target_from_string(repo_id).map_err(|e| e.to_string())?,
payload: None,
});
Ok(serde_wasm_bindgen::to_value(&request).unwrap())
}
// #[cfg(target_arch = "wasm32")] // #[cfg(target_arch = "wasm32")]
// #[wasm_bindgen] // #[wasm_bindgen]
// pub async fn get_readcap() -> Result<JsValue, String> { // pub async fn get_readcap() -> Result<JsValue, String> {

@ -36,6 +36,7 @@ web-time = "0.2.0"
either = "1.8.1" either = "1.8.1"
futures = "0.3.24" futures = "0.3.24"
async-trait = "0.1.64" async-trait = "0.1.64"
fastbloom-rs = "0.5.3"
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" } ng-storage-rocksdb = { path = "../ng-storage-rocksdb", version = "0.1.0" }

@ -111,6 +111,7 @@ impl CommitVerifier for RootBranch {
topic_priv_key, topic_priv_key,
read_cap: reference.clone(), read_cap: reference.clone(),
current_heads: vec![reference.clone()], current_heads: vec![reference.clone()],
commits_nbr: 1,
}; };
let id = root_branch.id; let id = root_branch.id;
let branches = vec![(root_branch.id, root_branch)]; let branches = vec![(root_branch.id, root_branch)];
@ -251,6 +252,7 @@ impl CommitVerifier for AddBranch {
topic_priv_key: None, topic_priv_key: None,
read_cap: v0.branch_read_cap.clone(), read_cap: v0.branch_read_cap.clone(),
current_heads: vec![], current_heads: vec![],
commits_nbr: 0,
}; };
verifier.add_branch_and_save( verifier.add_branch_and_save(

@ -130,6 +130,14 @@ impl AppRequestCommandV0 {
}; };
Ok((repo_id, branch, store_repo)) Ok((repo_id, branch, store_repo))
} }
NuriTargetV0::Repo(repo_id) => {
let (branch, store_repo) = {
let repo = verifier.repos.get(repo_id).ok_or(NgError::RepoNotFound)?;
let branch = repo.main_branch().ok_or(NgError::BranchNotFound)?;
(branch.id, repo.store.get_store_repo().clone())
};
Ok((*repo_id, branch, store_repo))
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }

@ -221,6 +221,20 @@ pub struct NuriV0 {
} }
impl NuriV0 { impl NuriV0 {
pub fn new_repo_target_from_string(repo_id_string: String) -> Result<Self, NgError> {
let repo_id: RepoId = repo_id_string.as_str().try_into()?;
Ok(Self {
target: NuriTargetV0::Repo(repo_id),
entire_store: false,
object: None,
branch: None,
overlay: None,
access: vec![],
topic: None,
locator: vec![],
})
}
pub fn new_private_store_target() -> Self { pub fn new_private_store_target() -> Self {
Self { Self {
target: NuriTargetV0::PrivateStore, target: NuriTargetV0::PrivateStore,

@ -15,6 +15,8 @@ use serde_bare::to_vec;
use ng_repo::errors::StorageError; use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::prop; use ng_repo::kcv_storage::prop;
use ng_repo::kcv_storage::KCVStorage; use ng_repo::kcv_storage::KCVStorage;
#[allow(unused_imports)]
use ng_repo::log::*;
use ng_repo::repo::BranchInfo; use ng_repo::repo::BranchInfo;
use ng_repo::types::*; use ng_repo::types::*;
@ -33,8 +35,15 @@ impl<'a> BranchStorage<'a> {
const PUBLISHER: u8 = b'p'; const PUBLISHER: u8 = b'p';
const READ_CAP: u8 = b'r'; const READ_CAP: u8 = b'r';
const TOPIC: u8 = b't'; const TOPIC: u8 = b't';
const COMMITS_NBR: u8 = b'n';
const ALL_PROPERTIES: [u8; 4] = [Self::TYPE, Self::PUBLISHER, Self::READ_CAP, Self::TOPIC]; const ALL_PROPERTIES: [u8; 5] = [
Self::TYPE,
Self::PUBLISHER,
Self::READ_CAP,
Self::TOPIC,
Self::COMMITS_NBR,
];
const PREFIX_HEADS: u8 = b'h'; const PREFIX_HEADS: u8 = b'h';
@ -136,6 +145,7 @@ impl<'a> BranchStorage<'a> {
topic: prop(Self::TOPIC, &props)?, topic: prop(Self::TOPIC, &props)?,
topic_priv_key: prop(Self::PUBLISHER, &props).ok(), topic_priv_key: prop(Self::PUBLISHER, &props).ok(),
current_heads: Self::get_all_heads(id, storage)?, current_heads: Self::get_all_heads(id, storage)?,
commits_nbr: prop(Self::COMMITS_NBR, &props).unwrap_or(0),
}; };
Ok(bs) Ok(bs)
} }
@ -227,6 +237,28 @@ impl<'a> BranchStorage<'a> {
key.append(&mut head_ser); key.append(&mut head_ser);
tx.put(Self::PREFIX_HEADS, &key, None, &vec![], &None)?; tx.put(Self::PREFIX_HEADS, &key, None, &vec![], &None)?;
} }
let mut val: u64 = match tx.get(Self::PREFIX, id_ser, Some(Self::COMMITS_NBR), &None) {
Ok(val_ser) => from_slice(&val_ser)?,
Err(StorageError::NotFound) => 0,
Err(e) => return Err(e),
};
val += 1;
let val_ser = to_vec(&val)?;
tx.put(
Self::PREFIX,
id_ser,
Some(Self::COMMITS_NBR),
&val_ser,
&None,
)?;
// log_info!(
// "putting commit_nbr {} {:?} {} {:?}",
// Self::PREFIX as char,
// id_ser,
// Self::COMMITS_NBR as char,
// val_ser
// );
Ok(()) Ok(())
}) })
} }

@ -21,6 +21,7 @@ use std::{collections::HashMap, sync::Arc};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::sync::{Mutex, RwLockReadGuard}; use async_std::sync::{Mutex, RwLockReadGuard};
use fastbloom_rs::{BloomFilter as Filter, FilterBuilder, Hashes, Membership};
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::SinkExt; use futures::SinkExt;
use ng_repo::object::Object; use ng_repo::object::Object;
@ -191,7 +192,7 @@ impl Verifier {
// ); // );
if let Some(sender) = self.branch_subscriptions.get_mut(branch) { if let Some(sender) = self.branch_subscriptions.get_mut(branch) {
if sender.is_closed() { if sender.is_closed() {
//log_info!("closed so removed"); log_info!("closed so removed {}", branch);
self.branch_subscriptions.remove(branch); self.branch_subscriptions.remove(branch);
} else { } else {
let _ = sender.send(response).await; let _ = sender.send(response).await;
@ -227,7 +228,7 @@ impl Verifier {
} }
let fnonce = Box::new(move || { let fnonce = Box::new(move || {
//log_info!("CLOSE_CHANNEL"); log_info!("CLOSE_CHANNEL of subscription for branch {}", branch);
if !tx.is_closed() { if !tx.is_closed() {
tx.close_channel(); tx.close_channel();
} }
@ -739,7 +740,7 @@ impl Verifier {
.ok_or(NgError::TopicNotFound)? .ok_or(NgError::TopicNotFound)?
.to_owned(); .to_owned();
self.update_branch_current_heads(&repo_id, &branch_id, past, commit_ref); self.update_branch_current_heads(&repo_id, &branch_id, past, commit_ref)?;
if self.connected_server_id.is_some() { if self.connected_server_id.is_some() {
// send the event to the server already // send the event to the server already
@ -965,6 +966,7 @@ impl Verifier {
branch, branch,
repo_id, repo_id,
topic.known_heads(), topic.known_heads(),
topic.commits_nbr(),
) )
.await?; .await?;
break; break;
@ -999,6 +1001,7 @@ impl Verifier {
branch, branch,
repo_id, repo_id,
topic.known_heads(), topic.known_heads(),
topic.commits_nbr(),
) )
.await?; .await?;
break; break;
@ -1042,6 +1045,7 @@ impl Verifier {
branch, branch,
repo_id, repo_id,
sub.known_heads(), sub.known_heads(),
sub.commits_nbr(),
) )
.await?; .await?;
} }
@ -1167,7 +1171,7 @@ impl Verifier {
if res.is_ok() && !skip_heads_update { if res.is_ok() && !skip_heads_update {
let commit_ref = commit.reference().unwrap(); let commit_ref = commit.reference().unwrap();
let past = commit.direct_causal_past(); let past = commit.direct_causal_past();
self.update_branch_current_heads(repo_id, branch_id, past, commit_ref); self.update_branch_current_heads(repo_id, branch_id, past, commit_ref)?;
Ok(()) Ok(())
} else { } else {
res res
@ -1291,8 +1295,14 @@ impl Verifier {
branch_id: &BranchId, branch_id: &BranchId,
repo_id: &RepoId, repo_id: &RepoId,
remote_heads: &Vec<ObjectId>, remote_heads: &Vec<ObjectId>,
remote_commits_nbr: u64,
) -> Result<(), NgError> { ) -> Result<(), NgError> {
let (store, msg, branch_secret) = { let (store, msg, branch_secret) = {
if remote_commits_nbr == 0 || remote_heads.is_empty() {
log_info!("branch is new on the broker. doing nothing");
return Ok(());
}
let repo = self.repos.get(repo_id).unwrap(); let repo = self.repos.get(repo_id).unwrap();
let branch_info = repo.branch(branch_id)?; let branch_info = repo.branch(branch_id)?;
@ -1302,10 +1312,7 @@ impl Verifier {
let ours_set: HashSet<Digest> = HashSet::from_iter(ours.clone()); let ours_set: HashSet<Digest> = HashSet::from_iter(ours.clone());
let theirs = HashSet::from_iter(remote_heads.clone().into_iter()); let theirs = HashSet::from_iter(remote_heads.clone().into_iter());
if theirs.is_empty() {
log_info!("branch is new on the broker. doing nothing");
return Ok(());
}
if ours_set.difference(&theirs).count() == 0 if ours_set.difference(&theirs).count() == 0
&& theirs.difference(&ours_set).count() == 0 && theirs.difference(&ours_set).count() == 0
{ {
@ -1326,6 +1333,7 @@ impl Verifier {
&mut None, &mut None,
None, None,
&mut Some(&mut theirs_found), &mut Some(&mut theirs_found),
&None,
); );
} }
} }
@ -1333,16 +1341,33 @@ impl Verifier {
let theirs_not_found: Vec<ObjectId> = let theirs_not_found: Vec<ObjectId> =
theirs.difference(&theirs_found).cloned().collect(); theirs.difference(&theirs_found).cloned().collect();
if theirs_not_found.is_empty() { let known_commits = if theirs_not_found.is_empty() {
return Ok(()); return Ok(());
} else { } else {
// prepare bloom filter if visited.is_empty() {
} None
} else {
// prepare bloom filter
let expected_elements =
remote_commits_nbr + max(visited.len() as u64, branch_info.commits_nbr);
let mut config = FilterBuilder::new(expected_elements, 0.01);
config.enable_repeat_insert(false);
let mut filter = Filter::new(config);
for commit_id in visited.keys() {
filter.add(commit_id.slice());
}
Some(BloomFilter {
k: filter.hashes(),
f: filter.get_u8_array().to_vec(),
})
}
};
let msg = TopicSyncReq::V0(TopicSyncReqV0 { let msg = TopicSyncReq::V0(TopicSyncReqV0 {
topic: branch_info.topic, topic: branch_info.topic,
known_heads: ours_set.union(&theirs_found).into_iter().cloned().collect(), known_heads: ours_set.union(&theirs_found).into_iter().cloned().collect(),
target_heads: theirs_not_found, target_heads: theirs_not_found,
known_commits,
overlay: Some(store.overlay_for_read_on_client_protocol()), overlay: Some(store.overlay_for_read_on_client_protocol()),
}); });
(store, msg, branch_info.read_cap.key.clone()) (store, msg, branch_info.read_cap.key.clone())
@ -1359,6 +1384,8 @@ impl Verifier {
.event() .event()
.open(&store, repo_id, branch_id, &branch_secret)?; .open(&store, repo_id, branch_id, &branch_secret)?;
// TODO: deal with missing commits in the DAG (fetch them individually with CommitGet). This can happen because of false positive on BloomFilter
self.verify_commit(&commit, branch_id, repo_id, Arc::clone(&store)) self.verify_commit(&commit, branch_id, repo_id, Arc::clone(&store))
.await?; .await?;
} }
@ -1940,7 +1967,7 @@ impl Verifier {
// and have oxigraph use directly the UserStorage // and have oxigraph use directly the UserStorage
Some( Some(
oxigraph::store::Store::open_with_key(path_oxi, config.user_master_key) oxigraph::store::Store::open_with_key(path_oxi, config.user_master_key)
.unwrap(), .map_err(|e| NgError::OxiGraphError(e.to_string()))?,
), ),
Some(Box::new(RocksDbUserStorage::open( Some(Box::new(RocksDbUserStorage::open(
&path_user, &path_user,

@ -427,7 +427,7 @@ const sport: [EmojiDef<'static>; 15] = [
EmojiDef { EmojiDef {
hexcode: "1f6a3", hexcode: "1f6a3",
shortcode: "person_rowing_boat", shortcode: "person_rowing_boat",
code: "boat", code: "rowing_boat",
}, },
EmojiDef { EmojiDef {
hexcode: "1f3ca", hexcode: "1f3ca",

@ -133,6 +133,7 @@ impl SessionWalletStorageV0 {
pub struct SessionInfo { pub struct SessionInfo {
pub session_id: u64, pub session_id: u64,
pub user: UserId, pub user: UserId,
pub private_store_id: String,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

Loading…
Cancel
Save