Rust implementation of NextGraph, a Decentralized and local-first web 3.0 ecosystem
https://nextgraph.org
byzantine-fault-tolerancecrdtsdappsdecentralizede2eeeventual-consistencyjson-ldlocal-firstmarkdownocapoffline-firstp2pp2p-networkprivacy-protectionrdfrich-text-editorself-hostedsemantic-websparqlweb3collaboration
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
752 lines
24 KiB
752 lines
24 KiB
/*
|
|
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
|
* All rights reserved.
|
|
* Licensed under the Apache License, Version 2.0
|
|
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
|
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
|
* at your option. All files in the project carrying such
|
|
* notice may not be copied, modified, or distributed except
|
|
* according to those terms.
|
|
*/
|
|
|
|
//! Implementation of the Server Broker
|
|
|
|
use std::{
|
|
collections::{HashMap, HashSet},
|
|
path::PathBuf,
|
|
sync::Arc,
|
|
};
|
|
|
|
use async_std::sync::{Mutex, RwLock};
|
|
use either::Either;
|
|
use futures::StreamExt;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use ng_repo::{
|
|
block_storage::BlockStorage,
|
|
errors::{NgError, ProtocolError, ServerError},
|
|
log::*,
|
|
types::*,
|
|
};
|
|
|
|
use ng_net::{
|
|
app_protocol::*,
|
|
broker::{ClientPeerId, BROKER},
|
|
connection::NoiseFSM,
|
|
server_broker::IServerBroker,
|
|
types::*,
|
|
};
|
|
|
|
use ng_verifier::{
|
|
site::SiteV0,
|
|
types::{BrokerPeerId, VerifierConfig, VerifierConfigType},
|
|
verifier::Verifier,
|
|
};
|
|
|
|
use crate::rocksdb_server_storage::RocksDbServerStorage;
|
|
|
|
pub struct TopicInfo {
|
|
pub repo: RepoHash,
|
|
|
|
pub publisher_advert: Option<PublisherAdvert>,
|
|
|
|
pub current_heads: HashSet<ObjectId>,
|
|
|
|
pub root_commit: Option<ObjectId>,
|
|
|
|
/// indicates which users have opened the topic (boolean says if as publisher or not)
|
|
pub users: HashMap<UserId, bool>,
|
|
}
|
|
|
|
pub struct RepoInfo {
|
|
/// set of users that requested the repo to be exposed on the outer overlay
|
|
/// only possible if the user is a publisher
|
|
pub expose_outer: HashSet<UserId>,
|
|
|
|
/// set of topics of this repo
|
|
pub topics: HashSet<TopicId>,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
pub struct EventInfo {
|
|
pub event: Event,
|
|
pub blocks: Vec<BlockId>,
|
|
}
|
|
|
|
pub struct CommitInfo {
|
|
pub event: Either<EventInfo, TopicId>,
|
|
pub home_pinned: bool,
|
|
pub acks: HashSet<ObjectId>,
|
|
pub deps: HashSet<ObjectId>,
|
|
pub futures: HashSet<ObjectId>,
|
|
pub files: HashSet<ObjectId>,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
|
pub enum OverlayType {
|
|
OuterOnly,
|
|
Outer(OverlayId), // the ID of the inner overlay corresponding to this outer.
|
|
Inner(OverlayId), // the ID of the outer overlay corresponding to the inner
|
|
InnerOnly,
|
|
}
|
|
|
|
impl OverlayType {
|
|
pub fn is_inner_get_outer(&self) -> Option<&OverlayId> {
|
|
match self {
|
|
Self::Inner(outer) => Some(outer),
|
|
_ => None,
|
|
}
|
|
}
|
|
pub fn is_outer_to_inner(&self) -> bool {
|
|
match self {
|
|
Self::Outer(_) => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
pub fn is_outer_only(&self) -> bool {
|
|
match self {
|
|
Self::OuterOnly => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<OverlayAccess> for OverlayType {
|
|
fn from(oa: OverlayAccess) -> OverlayType {
|
|
match oa {
|
|
OverlayAccess::ReadOnly(_) => {
|
|
panic!("cannot create an OverlayType from a ReadOnly OverlayAccess")
|
|
}
|
|
OverlayAccess::ReadWrite((_inner, outer)) => OverlayType::Inner(outer),
|
|
OverlayAccess::WriteOnly(_inner) => OverlayType::InnerOnly,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub(crate) struct OverlayInfo {
|
|
pub overlay_type: OverlayType,
|
|
pub overlay_topic: Option<TopicId>,
|
|
pub topics: HashMap<TopicId, TopicInfo>,
|
|
pub repos: HashMap<RepoHash, RepoInfo>,
|
|
}
|
|
|
|
struct DetachableVerifier {
|
|
detach: bool,
|
|
attached: Option<(DirectPeerId, u64)>,
|
|
verifier: Verifier,
|
|
}
|
|
|
|
pub struct ServerBrokerState {
|
|
#[allow(dead_code)]
|
|
overlays: HashMap<OverlayId, OverlayInfo>,
|
|
#[allow(dead_code)]
|
|
inner_overlays: HashMap<OverlayId, Option<OverlayId>>,
|
|
|
|
local_subscriptions: HashMap<(OverlayId, TopicId), HashMap<PubKey, Option<UserId>>>,
|
|
|
|
verifiers: HashMap<UserId, Arc<RwLock<DetachableVerifier>>>,
|
|
remote_apps: HashMap<(DirectPeerId, u64), UserId>,
|
|
}
|
|
|
|
pub struct ServerBroker {
|
|
storage: RocksDbServerStorage,
|
|
|
|
state: RwLock<ServerBrokerState>,
|
|
|
|
path_users: PathBuf,
|
|
}
|
|
|
|
impl ServerBroker {
|
|
pub(crate) fn new(storage: RocksDbServerStorage, path_users: PathBuf) -> Self {
|
|
ServerBroker {
|
|
storage: storage,
|
|
state: RwLock::new(ServerBrokerState {
|
|
overlays: HashMap::new(),
|
|
inner_overlays: HashMap::new(),
|
|
local_subscriptions: HashMap::new(),
|
|
verifiers: HashMap::new(),
|
|
remote_apps: HashMap::new(),
|
|
}),
|
|
|
|
path_users,
|
|
}
|
|
}
|
|
|
|
pub fn load(&mut self) -> Result<(), NgError> {
|
|
Ok(())
|
|
}
|
|
|
|
async fn add_subscription(
|
|
&self,
|
|
overlay: OverlayId,
|
|
topic: TopicId,
|
|
peer: ClientPeerId,
|
|
) -> Result<(), ServerError> {
|
|
let mut lock = self.state.write().await;
|
|
let peers_map = lock
|
|
.local_subscriptions
|
|
.entry((overlay, topic))
|
|
.or_insert(HashMap::with_capacity(1));
|
|
|
|
log_debug!(
|
|
"SUBSCRIBING PEER {:?} TOPIC {} OVERLAY {}",
|
|
peer,
|
|
topic,
|
|
overlay
|
|
);
|
|
|
|
if peers_map.insert(*peer.key(), peer.value()).is_some() {
|
|
//return Err(ServerError::PeerAlreadySubscribed);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
async fn remove_subscription(
|
|
&self,
|
|
overlay: &OverlayId,
|
|
topic: &TopicId,
|
|
peer: &PubKey,
|
|
) -> Result<(), ServerError> {
|
|
let mut lock = self.state.write().await;
|
|
let peers_set = lock
|
|
.local_subscriptions
|
|
.get_mut(&(*overlay, *topic))
|
|
.ok_or(ServerError::SubscriptionNotFound)?;
|
|
|
|
if peers_set.remove(peer).is_none() {
|
|
return Err(ServerError::SubscriptionNotFound);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn new_verifier_from_credentials(
|
|
&self,
|
|
user_id: &UserId,
|
|
credentials: Credentials,
|
|
local_peer_id: DirectPeerId,
|
|
partial_credentials: bool,
|
|
) -> Result<Verifier, NgError> {
|
|
let block_storage = self.get_block_storage();
|
|
let mut path = self.get_path_users();
|
|
let user_hash: Digest = user_id.into();
|
|
path.push(user_hash.to_string());
|
|
std::fs::create_dir_all(path.clone()).unwrap();
|
|
let peer_id_dh = credentials.peer_priv_key.to_pub().to_dh_from_ed();
|
|
let mut verifier = Verifier::new(
|
|
VerifierConfig {
|
|
config_type: VerifierConfigType::RocksDb(path),
|
|
user_master_key: *credentials.user_master_key.slice(),
|
|
peer_priv_key: credentials.peer_priv_key,
|
|
user_priv_key: credentials.user_key,
|
|
private_store_read_cap: if partial_credentials {
|
|
None
|
|
} else {
|
|
Some(credentials.read_cap)
|
|
},
|
|
private_store_id: if partial_credentials {
|
|
None
|
|
} else {
|
|
Some(credentials.private_store)
|
|
},
|
|
protected_store_id: if partial_credentials {
|
|
None
|
|
} else {
|
|
Some(credentials.protected_store)
|
|
},
|
|
public_store_id: if partial_credentials {
|
|
None
|
|
} else {
|
|
Some(credentials.public_store)
|
|
},
|
|
},
|
|
block_storage,
|
|
)?;
|
|
if !partial_credentials {
|
|
verifier.connected_broker = BrokerPeerId::Local(local_peer_id);
|
|
// start the local transport connection
|
|
let mut lock = BROKER.write().await;
|
|
lock.connect_local(peer_id_dh, *user_id)?;
|
|
}
|
|
Ok(verifier)
|
|
}
|
|
}
|
|
|
|
//TODO: the purpose of this trait is to have a level of indirection so we can keep some data in memory (cache) and avoid hitting the storage backend (rocksdb) at every call.
|
|
//for now this cache is not implemented, but the structs are ready (see above), and it would just require to change slightly the implementation of the trait functions here below.
|
|
#[async_trait::async_trait]
|
|
impl IServerBroker for ServerBroker {
|
|
fn get_block_storage(
|
|
&self,
|
|
) -> std::sync::Arc<std::sync::RwLock<dyn BlockStorage + Send + Sync>> {
|
|
self.storage.get_block_storage()
|
|
}
|
|
|
|
fn get_path_users(&self) -> PathBuf {
|
|
self.path_users.clone()
|
|
}
|
|
|
|
fn has_block(&self, overlay_id: &OverlayId, block_id: &BlockId) -> Result<(), ServerError> {
|
|
self.storage.has_block(overlay_id, block_id)
|
|
}
|
|
|
|
fn get_block(&self, overlay_id: &OverlayId, block_id: &BlockId) -> Result<Block, ServerError> {
|
|
self.storage.get_block(overlay_id, block_id)
|
|
}
|
|
|
|
fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> {
|
|
self.storage.next_seq_for_peer(peer, seq)
|
|
}
|
|
|
|
fn put_block(&self, overlay_id: &OverlayId, block: Block) -> Result<(), ServerError> {
|
|
self.storage.add_block(overlay_id, block)?;
|
|
Ok(())
|
|
}
|
|
async fn create_user(&self, broker_id: &DirectPeerId) -> Result<UserId, ProtocolError> {
|
|
let user_privkey = PrivKey::random_ed();
|
|
let user_id = user_privkey.to_pub();
|
|
let mut creds = Credentials::new_partial(&user_privkey);
|
|
let mut verifier = self
|
|
.new_verifier_from_credentials(&user_id, creds.clone(), *broker_id, true)
|
|
.await?;
|
|
let _site = SiteV0::create_personal(user_privkey.clone(), &mut verifier)
|
|
.await
|
|
.map_err(|e| {
|
|
log_err!("create_personal failed with {e}");
|
|
ProtocolError::BrokerError
|
|
})?;
|
|
|
|
// update credentials from config of verifier.
|
|
verifier.complement_credentials(&mut creds);
|
|
//verifier.close().await;
|
|
// save credentials and user
|
|
self.add_user_credentials(&user_id, &creds)?;
|
|
|
|
verifier.connected_broker = BrokerPeerId::Local(*broker_id);
|
|
|
|
// start the local transport connection
|
|
{
|
|
let mut lock = BROKER.write().await;
|
|
let peer_id_dh = creds.peer_priv_key.to_pub().to_dh_from_ed();
|
|
lock.connect_local(peer_id_dh, user_id)?;
|
|
}
|
|
let _res = verifier.send_outbox().await;
|
|
if _res.is_err() {
|
|
log_err!("{:?}", _res);
|
|
}
|
|
|
|
Ok(user_id)
|
|
}
|
|
|
|
fn get_user(&self, user_id: PubKey) -> Result<bool, ProtocolError> {
|
|
self.storage.get_user(user_id)
|
|
}
|
|
fn add_user_credentials(
|
|
&self,
|
|
user_id: &PubKey,
|
|
credentials: &Credentials,
|
|
) -> Result<(), ProtocolError> {
|
|
self.storage.add_user_credentials(user_id, credentials)
|
|
}
|
|
fn get_user_credentials(&self, user_id: &PubKey) -> Result<Credentials, ProtocolError> {
|
|
self.storage.get_user_credentials(user_id)
|
|
}
|
|
fn add_user(&self, user_id: PubKey, is_admin: bool) -> Result<(), ProtocolError> {
|
|
self.storage.add_user(user_id, is_admin)
|
|
}
|
|
|
|
fn del_user(&self, user_id: PubKey) -> Result<(), ProtocolError> {
|
|
self.storage.del_user(user_id)
|
|
}
|
|
fn list_users(&self, admins: bool) -> Result<Vec<PubKey>, ProtocolError> {
|
|
self.storage.list_users(admins)
|
|
}
|
|
fn list_invitations(
|
|
&self,
|
|
admin: bool,
|
|
unique: bool,
|
|
multi: bool,
|
|
) -> Result<Vec<(InvitationCode, u32, Option<String>)>, ProtocolError> {
|
|
self.storage.list_invitations(admin, unique, multi)
|
|
}
|
|
fn add_invitation(
|
|
&self,
|
|
invite_code: &InvitationCode,
|
|
expiry: u32,
|
|
memo: &Option<String>,
|
|
) -> Result<(), ProtocolError> {
|
|
self.storage.add_invitation(invite_code, expiry, memo)
|
|
}
|
|
fn get_invitation_type(&self, invite_code: [u8; 32]) -> Result<u8, ProtocolError> {
|
|
self.storage.get_invitation_type(invite_code)
|
|
}
|
|
fn remove_invitation(&self, invite_code: [u8; 32]) -> Result<(), ProtocolError> {
|
|
self.storage.remove_invitation(invite_code)
|
|
}
|
|
|
|
async fn app_process_request(
|
|
&self,
|
|
req: AppRequest,
|
|
request_id: i64,
|
|
fsm: &Mutex<NoiseFSM>,
|
|
) -> Result<(), ServerError> {
|
|
// get the session
|
|
let remote = {
|
|
fsm.lock()
|
|
.await
|
|
.remote_peer()
|
|
.ok_or(ServerError::SessionNotFound)?
|
|
};
|
|
|
|
let session_id = (remote, req.session_id());
|
|
let session_lock = {
|
|
let lock = self.state.read().await;
|
|
let user_id = lock
|
|
.remote_apps
|
|
.get(&session_id)
|
|
.ok_or(ServerError::SessionNotFound)?
|
|
.to_owned();
|
|
|
|
Arc::clone(
|
|
lock.verifiers
|
|
.get(&user_id)
|
|
.ok_or(ServerError::SessionNotFound)?,
|
|
)
|
|
};
|
|
|
|
let mut session = session_lock.write().await;
|
|
|
|
if session.attached.is_none() || session.attached.unwrap() != session_id {
|
|
return Err(ServerError::SessionDetached);
|
|
}
|
|
|
|
if req.command().is_stream() {
|
|
let res = session.verifier.app_request_stream(req).await;
|
|
|
|
match res {
|
|
Err(e) => {
|
|
let error: ServerError = e.into();
|
|
let error_res: AppMessage = error.into();
|
|
fsm.lock()
|
|
.await
|
|
.send_in_reply_to(error_res.into(), request_id)
|
|
.await?;
|
|
}
|
|
Ok((mut receiver, _cancel)) => {
|
|
//TODO: implement cancel
|
|
let mut some_sent = false;
|
|
while let Some(response) = receiver.next().await {
|
|
some_sent = true;
|
|
let mut msg: AppMessage = response.into();
|
|
msg.set_result(ServerError::PartialContent.into());
|
|
fsm.lock()
|
|
.await
|
|
.send_in_reply_to(msg.into(), request_id)
|
|
.await?;
|
|
}
|
|
let end: Result<EmptyAppResponse, ServerError> = if some_sent {
|
|
Err(ServerError::EndOfStream)
|
|
} else {
|
|
Err(ServerError::EmptyStream)
|
|
};
|
|
fsm.lock()
|
|
.await
|
|
.send_in_reply_to(end.into(), request_id)
|
|
.await?;
|
|
}
|
|
}
|
|
} else {
|
|
let res = session.verifier.app_request(req).await;
|
|
//log_debug!("GOT RES {:?}", res);
|
|
let app_message: AppMessage = match res {
|
|
Err(e) => {
|
|
log_debug!("AppRequest error NgError {e}");
|
|
let server_err: ServerError = e.into();
|
|
server_err.into()
|
|
}
|
|
Ok(app_res) => app_res.into(),
|
|
};
|
|
fsm.lock()
|
|
.await
|
|
.send_in_reply_to(app_message.into(), request_id)
|
|
.await?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn app_session_start(
|
|
&self,
|
|
req: AppSessionStart,
|
|
remote: DirectPeerId,
|
|
local_peer_id: DirectPeerId,
|
|
) -> Result<AppSessionStartResponse, ServerError> {
|
|
let user_id = req.user_id();
|
|
let id = (remote, req.session_id());
|
|
let verifier_lock_res = {
|
|
let lock = self.state.read().await;
|
|
lock.verifiers.get(user_id).map(|l| Arc::clone(l))
|
|
};
|
|
let verifier_lock = match verifier_lock_res {
|
|
Some(session_lock) => {
|
|
let mut session = session_lock.write().await;
|
|
if let Some((peer_id, session_id)) = session.attached {
|
|
if peer_id != remote || session_id == req.session_id() {
|
|
// remove the previous session
|
|
let mut write_lock = self.state.write().await;
|
|
let _ = write_lock.remote_apps.remove(&(peer_id, session_id));
|
|
}
|
|
}
|
|
session.attached = Some(id);
|
|
Arc::clone(&session_lock)
|
|
}
|
|
None => {
|
|
// we create and load a new verifier
|
|
|
|
let credentials = if req.credentials().is_none() {
|
|
// headless do not have credentials. we fetch them from server_storage
|
|
self.storage.get_user_credentials(user_id)?
|
|
} else {
|
|
req.credentials().clone().unwrap()
|
|
};
|
|
|
|
if *user_id != credentials.user_key.to_pub() {
|
|
log_debug!("InvalidRequest");
|
|
return Err(ServerError::InvalidRequest);
|
|
}
|
|
|
|
let verifier = self
|
|
.new_verifier_from_credentials(user_id, credentials, local_peer_id, false)
|
|
.await;
|
|
if verifier.is_err() {
|
|
log_err!(
|
|
"new_verifier failed with: {:?}",
|
|
verifier.as_ref().unwrap_err()
|
|
);
|
|
}
|
|
let mut verifier = verifier?;
|
|
|
|
// TODO : key.zeroize();
|
|
|
|
//load verifier from local_storage
|
|
let _ = verifier.load();
|
|
//TODO: save opened_branches in user_storage, so that when we open again the verifier, the syncing can work
|
|
verifier.sync().await;
|
|
|
|
let session = DetachableVerifier {
|
|
detach: true,
|
|
attached: Some(id),
|
|
verifier,
|
|
};
|
|
let mut write_lock = self.state.write().await;
|
|
Arc::clone(
|
|
write_lock
|
|
.verifiers
|
|
.entry(*user_id)
|
|
.or_insert(Arc::new(RwLock::new(session))),
|
|
)
|
|
}
|
|
};
|
|
let verifier = &verifier_lock.read().await.verifier;
|
|
let res = AppSessionStartResponse::V0(AppSessionStartResponseV0 {
|
|
private_store: *verifier.private_store_id(),
|
|
protected_store: *verifier.protected_store_id(),
|
|
public_store: *verifier.public_store_id(),
|
|
});
|
|
let mut write_lock = self.state.write().await;
|
|
if let Some(previous_user) = write_lock.remote_apps.insert(id, *user_id) {
|
|
// weird. another session was opened for this id.
|
|
// we have to stop it otherwise it would be dangling.
|
|
if previous_user != *user_id {
|
|
if let Some(previous_session_lock) = write_lock
|
|
.verifiers
|
|
.get(&previous_user)
|
|
.map(|v| Arc::clone(v))
|
|
{
|
|
let mut previous_session = previous_session_lock.write().await;
|
|
if previous_session.detach {
|
|
previous_session.attached = None;
|
|
} else {
|
|
// we stop it and drop it
|
|
let verifier = write_lock.verifiers.remove(&previous_user);
|
|
verifier.unwrap().read().await.verifier.close().await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(res)
|
|
}
|
|
|
|
async fn app_session_stop(
|
|
&self,
|
|
req: AppSessionStop,
|
|
remote_peer_id: &DirectPeerId,
|
|
) -> Result<EmptyAppResponse, ServerError> {
|
|
let id = (*remote_peer_id, req.session_id());
|
|
|
|
let mut write_lock = self.state.write().await;
|
|
let must_be_destroyed = {
|
|
let session_user = write_lock
|
|
.remote_apps
|
|
.remove(&id)
|
|
.ok_or(ServerError::SessionNotFound)?;
|
|
let session = Arc::clone(
|
|
write_lock
|
|
.verifiers
|
|
.get(&session_user)
|
|
.ok_or(ServerError::SessionNotFound)?,
|
|
);
|
|
let mut verifier_lock = session.write().await;
|
|
if !req.is_force_close() && verifier_lock.detach {
|
|
verifier_lock.attached = None;
|
|
None
|
|
} else {
|
|
Some(session_user)
|
|
}
|
|
};
|
|
if let Some(user) = must_be_destroyed {
|
|
let verifier = write_lock.verifiers.remove(&user);
|
|
verifier.unwrap().read().await.verifier.close().await;
|
|
}
|
|
Ok(EmptyAppResponse(()))
|
|
}
|
|
|
|
fn get_repo_pin_status(
|
|
&self,
|
|
overlay: &OverlayId,
|
|
repo: &RepoHash,
|
|
user: &UserId,
|
|
) -> Result<RepoPinStatus, ServerError> {
|
|
self.storage.get_repo_pin_status(overlay, repo, user)
|
|
}
|
|
|
|
async fn pin_repo_write(
|
|
&self,
|
|
overlay: &OverlayAccess,
|
|
repo: &RepoHash,
|
|
user_id: &UserId,
|
|
ro_topics: &Vec<TopicId>,
|
|
rw_topics: &Vec<PublisherAdvert>,
|
|
overlay_root_topic: &Option<TopicId>,
|
|
expose_outer: bool,
|
|
peer: &ClientPeerId,
|
|
) -> Result<RepoOpened, ServerError> {
|
|
let res = self.storage.pin_repo_write(
|
|
overlay,
|
|
repo,
|
|
user_id,
|
|
ro_topics,
|
|
rw_topics,
|
|
overlay_root_topic,
|
|
expose_outer,
|
|
)?;
|
|
|
|
for topic in res.iter() {
|
|
self.add_subscription(
|
|
*overlay.overlay_id_for_client_protocol_purpose(),
|
|
*topic.topic_id(),
|
|
peer.clone(),
|
|
)
|
|
.await?;
|
|
}
|
|
Ok(res)
|
|
}
|
|
|
|
async fn pin_repo_read(
|
|
&self,
|
|
overlay: &OverlayId,
|
|
repo: &RepoHash,
|
|
user_id: &UserId,
|
|
ro_topics: &Vec<TopicId>,
|
|
peer: &ClientPeerId,
|
|
) -> Result<RepoOpened, ServerError> {
|
|
let res = self
|
|
.storage
|
|
.pin_repo_read(overlay, repo, user_id, ro_topics)?;
|
|
|
|
for topic in res.iter() {
|
|
// TODO: those outer subscriptions are not handled yet. they will not emit events.
|
|
self.add_subscription(*overlay, *topic.topic_id(), peer.clone())
|
|
.await?;
|
|
}
|
|
Ok(res)
|
|
}
|
|
|
|
async fn topic_sub(
|
|
&self,
|
|
overlay: &OverlayId,
|
|
repo: &RepoHash,
|
|
topic: &TopicId,
|
|
user: &UserId,
|
|
publisher: Option<&PublisherAdvert>,
|
|
peer: &ClientPeerId,
|
|
) -> Result<TopicSubRes, ServerError> {
|
|
let res = self
|
|
.storage
|
|
.topic_sub(overlay, repo, topic, user, publisher)?;
|
|
self.add_subscription(*overlay, *topic, peer.clone())
|
|
.await?;
|
|
Ok(res)
|
|
}
|
|
|
|
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> {
|
|
self.storage.get_commit(overlay, id)
|
|
}
|
|
|
|
async fn remove_all_subscriptions_of_client(&self, client: &ClientPeerId) {
|
|
let remote_peer = client.key();
|
|
let mut lock = self.state.write().await;
|
|
for ((overlay, topic), peers) in lock.local_subscriptions.iter_mut() {
|
|
if peers.remove(remote_peer).is_some() {
|
|
log_debug!(
|
|
"subscription of peer {} to topic {} in overlay {} removed",
|
|
remote_peer,
|
|
topic,
|
|
overlay
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn dispatch_event(
|
|
&self,
|
|
overlay: &OverlayId,
|
|
event: Event,
|
|
user_id: &UserId,
|
|
remote_peer: &PubKey,
|
|
) -> Result<Vec<ClientPeerId>, ServerError> {
|
|
let topic = self.storage.save_event(overlay, event, user_id)?;
|
|
|
|
// log_debug!(
|
|
// "DISPATCH EVENT {} {} {:?}",
|
|
// overlay,
|
|
// topic,
|
|
// self.local_subscriptions
|
|
// );
|
|
let lock = self.state.read().await;
|
|
let mut map = lock
|
|
.local_subscriptions
|
|
.get(&(*overlay, topic))
|
|
.map(|map| map.iter().collect())
|
|
.unwrap_or(HashMap::new());
|
|
|
|
map.remove(remote_peer);
|
|
Ok(map
|
|
.iter()
|
|
.map(|(k, v)| ClientPeerId::new_from(k, v))
|
|
.collect())
|
|
}
|
|
|
|
fn topic_sync_req(
|
|
&self,
|
|
overlay: &OverlayId,
|
|
topic: &TopicId,
|
|
known_heads: &Vec<ObjectId>,
|
|
target_heads: &Vec<ObjectId>,
|
|
known_commits: &Option<BloomFilter>,
|
|
) -> Result<Vec<TopicSyncRes>, ServerError> {
|
|
self.storage
|
|
.topic_sync_req(overlay, topic, known_heads, target_heads, known_commits)
|
|
}
|
|
}
|
|
|