pull/19/head
Niko PLP 6 months ago
parent 508c7bf177
commit 1fa0eb0dc7
  1. 168
      ng-broker/src/rocksdb_server_storage.rs
  2. 83
      ng-broker/src/server_broker.rs
  3. 2
      ng-broker/src/server_storage/core/commit.rs
  4. 14
      ng-broker/src/server_storage/core/overlay.rs
  5. 34
      ng-broker/src/server_storage/core/repo.rs
  6. 9
      ng-broker/src/server_storage/core/topic.rs
  7. 2
      ng-net/src/actors/client/commit_get.rs
  8. 74
      ng-net/src/actors/client/pin_repo.rs
  9. 8
      ng-net/src/actors/client/repo_pin_status.rs
  10. 1
      ng-net/src/actors/client/topic_sub.rs
  11. 7
      ng-net/src/connection.rs
  12. 17
      ng-net/src/server_broker.rs
  13. 82
      ng-net/src/types.rs
  14. 24
      ng-repo/src/errors.rs
  15. 116
      ng-repo/src/kcv_storage.rs

@ -15,15 +15,15 @@ use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Mutex; use std::sync::Mutex;
use crate::server_broker::*;
use crate::server_storage::admin::account::Account; use crate::server_storage::admin::account::Account;
use crate::server_storage::admin::invitation::Invitation; use crate::server_storage::admin::invitation::Invitation;
use crate::server_storage::admin::wallet::Wallet; use crate::server_storage::admin::wallet::Wallet;
use crate::server_storage::core::*; use crate::server_storage::core::*;
use crate::types::*; use crate::types::*;
use ng_net::server_broker::*;
use ng_net::types::*; use ng_net::types::*;
use ng_repo::errors::{ProtocolError, ServerError, StorageError}; use ng_repo::errors::{ProtocolError, ServerError, StorageError};
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::types::*; use ng_repo::types::*;
use ng_storage_rocksdb::block_storage::RocksDbBlockStorage; use ng_storage_rocksdb::block_storage::RocksDbBlockStorage;
@ -239,36 +239,163 @@ impl RocksDbServerStorage {
&self, &self,
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
user: &UserId,
) -> Result<RepoPinStatus, ServerError> { ) -> Result<RepoPinStatus, ServerError> {
Err(ServerError::False) let repo_info = RepoHashStorage::load_for_user(user, repo, overlay, &self.core_storage)?;
//TODO: implement correctly ! let mut topics = vec![];
// Ok(RepoPinStatus::V0(RepoPinStatusV0 { for topic in repo_info.topics {
// hash: repo.clone(), if let Ok(mut model) = TopicStorage::open(&topic, overlay, &self.core_storage) {
match TopicStorage::USERS.get(&mut model, user) {
// // only possible for RW overlays Err(_) => {}
// expose_outer: false, Ok(publisher) => topics.push(TopicSubRes::new_from_heads(
TopicStorage::get_all_heads(&mut model)?,
publisher,
topic,
)),
}
}
}
if topics.len() == 0 {
return Err(ServerError::False);
}
// // list of topics that are subscribed to Ok(RepoPinStatus::V0(RepoPinStatusV0 {
// topics: vec![], hash: repo.clone(),
// })) expose_outer: repo_info.expose_outer.len() > 0,
topics,
}))
} }
pub(crate) fn pin_repo( pub(crate) fn pin_repo_write(
&self, &self,
overlay: &OverlayId, overlay_access: &OverlayAccess,
repo: &RepoHash, repo: &RepoHash,
user_id: &UserId,
ro_topics: &Vec<TopicId>, ro_topics: &Vec<TopicId>,
rw_topics: &Vec<PublisherAdvert>, rw_topics: &Vec<PublisherAdvert>,
overlay_root_topic: &Option<TopicId>,
expose_outer: bool,
) -> Result<RepoOpened, ServerError> { ) -> Result<RepoOpened, ServerError> {
//TODO: implement correctly ! assert!(!overlay_access.is_read_only());
let mut opened = Vec::with_capacity(ro_topics.len() + rw_topics.len());
for topic in ro_topics { let inner_overlay = overlay_access.overlay_id_for_client_protocol_purpose();
opened.push((*topic).into()); let mut inner_overlay_storage =
match OverlayStorage::open(inner_overlay, &self.core_storage) {
Err(StorageError::NotFound) => {
// inner overlay doesn't exist, we need to create it
OverlayStorage::create(
inner_overlay,
&(*overlay_access).into(),
&self.core_storage,
)?
}
Err(e) => return Err(e.into()),
Ok(os) => os,
};
// the overlay we use to store all the info is: the outer for a RW access, and the inner for a WO access.
let overlay = match inner_overlay_storage.overlay_type() {
OverlayType::Outer(_) => {
panic!("shouldnt happen: we are pinning to an inner overlay. why is it outer type?")
}
OverlayType::Inner(outer) => outer,
OverlayType::InnerOnly => inner_overlay,
}
.clone();
// if an overlay_root_topic was provided, we update it in the DB:
// this information is stored on the inner overlay record, contrary to the rest of the info below, that is stored on the outer (except for WO)
if overlay_root_topic.is_some() {
OverlayStorage::TOPIC.set(
&mut inner_overlay_storage,
overlay_root_topic.as_ref().unwrap(),
)?;
} }
// we now do the pinning :
let mut result: RepoOpened = vec![];
let mut repo_info = RepoHashStorage::open(repo, &overlay, &self.core_storage)?;
if expose_outer {
RepoHashStorage::EXPOSE_OUTER.add(&mut repo_info, user_id)?;
}
let mut rw_topics_added: HashMap<TopicId, TopicSubRes> =
HashMap::with_capacity(rw_topics.len());
for topic in rw_topics { for topic in rw_topics {
opened.push((*topic).into()); let topic_id = topic.topic_id();
let mut topic_storage =
TopicStorage::create(topic_id, &overlay, repo, &self.core_storage, true)?;
RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic_id)?;
let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, topic)?;
TopicStorage::USERS.add_or_change(&mut topic_storage, user_id, &true)?;
rw_topics_added.insert(
*topic_id,
TopicSubRes::new_from_heads(
TopicStorage::get_all_heads(&mut topic_storage)?,
true,
*topic_id,
),
);
}
for topic in ro_topics {
if rw_topics_added.contains_key(topic) {
continue;
//we do not want to add again as read_only, a topic that was just opened as RW (publisher)
}
let mut topic_storage =
TopicStorage::create(topic, &overlay, repo, &self.core_storage, true)?;
RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?;
let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &false)?;
result.push(TopicSubRes::new_from_heads(
TopicStorage::get_all_heads(&mut topic_storage)?,
false,
*topic,
));
}
result.extend(rw_topics_added.into_values());
Ok(result)
}
pub(crate) fn pin_repo_read(
&self,
overlay: &OverlayId,
repo: &RepoHash,
user_id: &UserId,
ro_topics: &Vec<TopicId>,
) -> Result<RepoOpened, ServerError> {
let mut overlay_storage = OverlayStorage::open(overlay, &self.core_storage)?;
match overlay_storage.overlay_type() {
OverlayType::Outer(_) => {
let mut result: RepoOpened = vec![];
let repo_info = RepoHashStorage::load_topics(repo, overlay, &self.core_storage)?;
for topic in ro_topics {
if repo_info.topics.contains(topic) {
let mut topic_storage =
TopicStorage::open(topic, overlay, &self.core_storage)?;
let _ =
TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &false)?;
result.push(TopicSubRes::new_from_heads(
TopicStorage::get_all_heads(&mut topic_storage)?,
false,
*topic,
));
}
}
Ok(result)
}
_ => return Err(ServerError::NotFound),
} }
Ok(opened)
} }
pub(crate) fn topic_sub( pub(crate) fn topic_sub(
@ -278,7 +405,6 @@ impl RocksDbServerStorage {
topic: &TopicId, topic: &TopicId,
publisher: Option<&PublisherAdvert>, publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError> { ) -> Result<TopicSubRes, ServerError> {
//TODO: implement correctly !
Ok(TopicSubRes::V0(TopicSubResV0 { Ok(TopicSubRes::V0(TopicSubResV0 {
topic: topic.clone(), topic: topic.clone(),
known_heads: vec![], known_heads: vec![],

@ -66,6 +66,27 @@ pub enum OverlayType {
InnerOnly, InnerOnly,
} }
impl OverlayType {
pub fn is_inner_get_outer(&self) -> Option<&OverlayId> {
match self {
Self::Inner(outer) => Some(outer),
_ => None,
}
}
}
impl From<OverlayAccess> for OverlayType {
fn from(oa: OverlayAccess) -> OverlayType {
match oa {
OverlayAccess::ReadOnly(_) => {
panic!("cannot create an OverlayType from a ReadOnly OverlayAccess")
}
OverlayAccess::ReadWrite((inner, outer)) => OverlayType::Inner(outer),
OverlayAccess::WriteOnly(inner) => OverlayType::InnerOnly,
}
}
}
pub struct OverlayInfo { pub struct OverlayInfo {
pub overlay_type: OverlayType, pub overlay_type: OverlayType,
pub overlay_topic: Option<TopicId>, pub overlay_topic: Option<TopicId>,
@ -75,6 +96,7 @@ pub struct OverlayInfo {
pub struct ServerBroker { pub struct ServerBroker {
storage: RocksDbServerStorage, storage: RocksDbServerStorage,
overlays: HashMap<OverlayId, OverlayInfo>, overlays: HashMap<OverlayId, OverlayInfo>,
inner_overlays: HashMap<OverlayId, Option<OverlayId>>, inner_overlays: HashMap<OverlayId, Option<OverlayId>>,
} }
@ -93,6 +115,9 @@ impl ServerBroker {
} }
} }
//TODO: the purpose of this trait is to have a level of indirection so we can keep some data in memory (cache) and avoid hitting the storage backend (rocksdb) at every call.
//for now this cache is not implemented, but the structs are ready (see above), and it would just require to change slightly the implementation of the trait functions here below.
impl IServerBroker for ServerBroker { impl IServerBroker for ServerBroker {
fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> { fn next_seq_for_peer(&self, peer: &PeerId, seq: u64) -> Result<(), ServerError> {
self.storage.next_seq_for_peer(peer, seq) self.storage.next_seq_for_peer(peer, seq)
@ -136,36 +161,41 @@ impl IServerBroker for ServerBroker {
&self, &self,
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
user: &UserId,
) -> Result<RepoPinStatus, ServerError> { ) -> Result<RepoPinStatus, ServerError> {
Err(ServerError::False) self.storage.get_repo_pin_status(overlay, repo, user)
//TODO: implement correctly ! }
// Ok(RepoPinStatus::V0(RepoPinStatusV0 {
// hash: repo.clone(),
// // only possible for RW overlays
// expose_outer: false,
// // list of topics that are subscribed to fn pin_repo_write(
// topics: vec![], &self,
// })) overlay: &OverlayAccess,
repo: &RepoHash,
user_id: &UserId,
ro_topics: &Vec<TopicId>,
rw_topics: &Vec<PublisherAdvert>,
overlay_root_topic: &Option<TopicId>,
expose_outer: bool,
) -> Result<RepoOpened, ServerError> {
self.storage.pin_repo_write(
overlay,
repo,
user_id,
ro_topics,
rw_topics,
overlay_root_topic,
expose_outer,
)
} }
fn pin_repo( fn pin_repo_read(
&self, &self,
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
user_id: &UserId,
ro_topics: &Vec<TopicId>, ro_topics: &Vec<TopicId>,
rw_topics: &Vec<PublisherAdvert>,
) -> Result<RepoOpened, ServerError> { ) -> Result<RepoOpened, ServerError> {
//TODO: implement correctly ! self.storage
let mut opened = Vec::with_capacity(ro_topics.len() + rw_topics.len()); .pin_repo_read(overlay, repo, user_id, ro_topics)
for topic in ro_topics {
opened.push((*topic).into());
}
for topic in rw_topics {
opened.push((*topic).into());
}
Ok(opened)
} }
fn topic_sub( fn topic_sub(
@ -173,18 +203,13 @@ impl IServerBroker for ServerBroker {
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
topic: &TopicId, topic: &TopicId,
user: &UserId,
publisher: Option<&PublisherAdvert>, publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError> { ) -> Result<TopicSubRes, ServerError> {
//TODO: implement correctly ! self.storage.topic_sub(overlay, repo, topic, publisher)
Ok(TopicSubRes::V0(TopicSubResV0 {
topic: topic.clone(),
known_heads: vec![],
publisher: publisher.is_some(),
}))
} }
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> { fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> {
//TODO: implement correctly ! self.storage.get_commit(overlay, id)
Ok(vec![Block::dummy()])
} }
} }

@ -115,7 +115,7 @@ impl<'a> CommitStorage<'a> {
id: &ObjectId, id: &ObjectId,
overlay: &OverlayId, overlay: &OverlayId,
event: &Option<EventInfo>, event: &Option<EventInfo>,
storage: &'a mut dyn KCVStorage, storage: &'a dyn KCVStorage,
) -> Result<CommitStorage<'a>, StorageError> { ) -> Result<CommitStorage<'a>, StorageError> {
let mut creating = CommitStorage::new(id, overlay, storage); let mut creating = CommitStorage::new(id, overlay, storage);
if creating.exists() { if creating.exists() {

@ -100,7 +100,7 @@ impl<'a> OverlayStorage<'a> {
pub fn create( pub fn create(
id: &OverlayId, id: &OverlayId,
overlay_type: &OverlayType, overlay_type: &OverlayType,
storage: &'a mut dyn KCVStorage, storage: &'a dyn KCVStorage,
) -> Result<OverlayStorage<'a>, StorageError> { ) -> Result<OverlayStorage<'a>, StorageError> {
let mut overlay = OverlayStorage::new(id, storage); let mut overlay = OverlayStorage::new(id, storage);
if overlay.exists() { if overlay.exists() {
@ -109,6 +109,18 @@ impl<'a> OverlayStorage<'a> {
overlay.overlay_type.set(overlay_type)?; overlay.overlay_type.set(overlay_type)?;
ExistentialValue::save(&overlay, overlay_type)?; ExistentialValue::save(&overlay, overlay_type)?;
if id.is_inner() {
if let Some(outer) = overlay_type.is_inner_get_outer() {
match OverlayStorage::create(outer, &OverlayType::Outer(*id), storage) {
Err(StorageError::AlreadyExists) => {
//it is ok if the Outer overlay already exists. someone else had pinned it before, in read_only, and the broker ahd subscribed to it from another broker
}
Err(e) => return Err(e), //TODO: remove the existentialvalue that was previously saved (or use a transaction)
Ok(_) => {}
}
}
}
Ok(overlay) Ok(overlay)
} }

@ -61,7 +61,6 @@ impl<'a> RepoHashStorage<'a> {
storage: &'a dyn KCVStorage, storage: &'a dyn KCVStorage,
) -> Result<RepoInfo, StorageError> { ) -> Result<RepoInfo, StorageError> {
let mut opening = Self::new(repo, overlay, storage); let mut opening = Self::new(repo, overlay, storage);
let info = RepoInfo { let info = RepoInfo {
topics: Self::TOPICS.get_all(&mut opening)?, topics: Self::TOPICS.get_all(&mut opening)?,
expose_outer: Self::EXPOSE_OUTER.get_all(&mut opening)?, expose_outer: Self::EXPOSE_OUTER.get_all(&mut opening)?,
@ -69,6 +68,37 @@ impl<'a> RepoHashStorage<'a> {
Ok(info) Ok(info)
} }
pub fn load_topics(
repo: &RepoHash,
overlay: &OverlayId,
storage: &'a dyn KCVStorage,
) -> Result<RepoInfo, StorageError> {
let mut opening = Self::new(repo, overlay, storage);
let info = RepoInfo {
topics: Self::TOPICS.get_all(&mut opening)?,
expose_outer: HashSet::new(),
};
Ok(info)
}
pub fn load_for_user(
user: &UserId,
repo: &RepoHash,
overlay: &OverlayId,
storage: &'a dyn KCVStorage,
) -> Result<RepoInfo, StorageError> {
let mut opening = Self::new(repo, overlay, storage);
let mut expose_outer = HashSet::new();
if let Ok(()) = Self::EXPOSE_OUTER.has(&mut opening, user) {
expose_outer.insert(*user);
}
let info = RepoInfo {
topics: Self::TOPICS.get_all(&mut opening)?,
expose_outer,
};
Ok(info)
}
pub fn new(repo: &RepoHash, overlay: &OverlayId, storage: &'a dyn KCVStorage) -> Self { pub fn new(repo: &RepoHash, overlay: &OverlayId, storage: &'a dyn KCVStorage) -> Self {
let mut key: Vec<u8> = Vec::with_capacity(33 + 33); let mut key: Vec<u8> = Vec::with_capacity(33 + 33);
key.append(&mut to_vec(overlay).unwrap()); key.append(&mut to_vec(overlay).unwrap());
@ -87,7 +117,7 @@ impl<'a> RepoHashStorage<'a> {
pub fn create( pub fn create(
repo: &RepoHash, repo: &RepoHash,
overlay: &OverlayId, overlay: &OverlayId,
storage: &'a mut dyn KCVStorage, storage: &'a dyn KCVStorage,
) -> Result<RepoHashStorage<'a>, StorageError> { ) -> Result<RepoHashStorage<'a>, StorageError> {
let mut creating = Self::new(repo, overlay, storage); let mut creating = Self::new(repo, overlay, storage);
Ok(creating) Ok(creating)

@ -109,11 +109,16 @@ impl<'a> TopicStorage<'a> {
id: &TopicId, id: &TopicId,
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
storage: &'a mut dyn KCVStorage, storage: &'a dyn KCVStorage,
or_open: bool,
) -> Result<TopicStorage<'a>, StorageError> { ) -> Result<TopicStorage<'a>, StorageError> {
let mut topic = TopicStorage::new(id, overlay, storage); let mut topic = TopicStorage::new(id, overlay, storage);
if topic.exists() { if topic.exists() {
return Err(StorageError::AlreadyExists); if or_open {
return Ok(topic);
} else {
return Err(StorageError::AlreadyExists);
}
} }
topic.repo.set(repo)?; topic.repo.set(repo)?;
ExistentialValue::save(&topic, repo)?; ExistentialValue::save(&topic, repo)?;

@ -92,7 +92,7 @@ impl EActor for Actor<'_, CommitGet, Block> {
let blocks_res = broker let blocks_res = broker
.get_server_broker()? .get_server_broker()?
.get_commit(req.overlay(), req.id()); .get_commit(req.overlay(), req.id());
// IF NEEDED, the get_commit could be changed to be async, and then the send_in_reply_to would be also totally async
match blocks_res { match blocks_res {
Ok(blocks) => { Ok(blocks) => {
if blocks.len() == 0 { if blocks.len() == 0 {

@ -25,8 +25,7 @@ impl PinRepo {
Actor::<PinRepo, RepoOpened>::new_responder(id) Actor::<PinRepo, RepoOpened>::new_responder(id)
} }
pub fn from_repo(repo: &Repo, broker_id: &DirectPeerId) -> PinRepo { pub fn from_repo(repo: &Repo, broker_id: &DirectPeerId) -> PinRepo {
let overlay = let overlay = OverlayAccess::new_write_access_from_store(&repo.store);
OverlayAccess::ReadWrite((repo.store.inner_overlay(), repo.store.outer_overlay()));
let mut rw_topics = Vec::with_capacity(repo.branches.len()); let mut rw_topics = Vec::with_capacity(repo.branches.len());
let mut ro_topics = vec![]; let mut ro_topics = vec![];
for (_, branch) in repo.branches.iter() { for (_, branch) in repo.branches.iter() {
@ -107,18 +106,73 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> {
) -> Result<(), ProtocolError> { ) -> Result<(), ProtocolError> {
let req = PinRepo::try_from(msg)?; let req = PinRepo::try_from(msg)?;
//TODO implement all the server side logic
let broker = BROKER.read().await; let broker = BROKER.read().await;
let res = broker.get_server_broker()?.pin_repo(
req.overlay(),
req.hash(),
req.ro_topics(),
req.rw_topics(),
);
// check the validity of the PublisherAdvert(s)
let server_peer_id = broker.get_config().unwrap().peer_id;
for pub_ad in req.rw_topics() {
pub_ad.verify_for_broker(&server_peer_id)?;
}
let result = {
match req.overlay_access() {
OverlayAccess::ReadOnly(r) => {
if r.is_inner()
|| req.overlay() != r
|| req.rw_topics().len() > 0
|| req.overlay_root_topic().is_some()
{
Err(ServerError::InvalidRequest)
} else {
broker.get_server_broker()?.pin_repo_read(
req.overlay(),
req.hash(),
&fsm.lock().await.user_id_or_err()?,
req.ro_topics(),
)
}
}
OverlayAccess::ReadWrite((w, r)) => {
if req.overlay() != w
|| !w.is_inner()
|| r.is_inner()
|| req.expose_outer() && req.rw_topics().len() == 0
{
// we do not allow to expose_outer if not a publisher for at least one topic
// TODO add a check on "|| overlay_root_topic.is_none()" because it should be mandatory to have one (not sent by client at the moment)
Err(ServerError::InvalidRequest)
} else {
broker.get_server_broker()?.pin_repo_write(
req.overlay_access(),
req.hash(),
&fsm.lock().await.user_id_or_err()?,
req.ro_topics(),
req.rw_topics(),
req.overlay_root_topic(),
req.expose_outer(),
)
}
}
OverlayAccess::WriteOnly(w) => {
if !w.is_inner() || req.overlay() != w || req.expose_outer() {
Err(ServerError::InvalidRequest)
} else {
broker.get_server_broker()?.pin_repo_write(
req.overlay_access(),
req.hash(),
&fsm.lock().await.user_id_or_err()?,
req.ro_topics(),
req.rw_topics(),
req.overlay_root_topic(),
false,
)
}
}
}
};
fsm.lock() fsm.lock()
.await .await
.send_in_reply_to(res.into(), self.id()) .send_in_reply_to(result.into(), self.id())
.await?; .await?;
Ok(()) Ok(())
} }

@ -78,9 +78,11 @@ impl EActor for Actor<'_, RepoPinStatusReq, RepoPinStatus> {
) -> Result<(), ProtocolError> { ) -> Result<(), ProtocolError> {
let req = RepoPinStatusReq::try_from(msg)?; let req = RepoPinStatusReq::try_from(msg)?;
let broker = BROKER.read().await; let broker = BROKER.read().await;
let res = broker let res = broker.get_server_broker()?.get_repo_pin_status(
.get_server_broker()? req.overlay(),
.get_repo_pin_status(req.overlay(), req.hash()); req.hash(),
&fsm.lock().await.user_id_or_err()?,
);
fsm.lock() fsm.lock()
.await .await
.send_in_reply_to(res.into(), self.id()) .send_in_reply_to(res.into(), self.id())

@ -104,6 +104,7 @@ impl EActor for Actor<'_, TopicSub, TopicSubRes> {
req.overlay(), req.overlay(),
req.hash(), req.hash(),
req.topic(), req.topic(),
&fsm.lock().await.user_id_or_err()?,
req.publisher(), req.publisher(),
); );

@ -261,6 +261,13 @@ impl NoiseFSM {
} }
} }
pub fn user_id_or_err(&self) -> Result<UserId, ProtocolError> {
match &self.config {
Some(start_config) => start_config.get_user().ok_or(ProtocolError::ActorError),
_ => Err(ProtocolError::ActorError),
}
}
fn decrypt(&mut self, ciphertext: &Noise) -> Result<ProtocolMessage, ProtocolError> { fn decrypt(&mut self, ciphertext: &Noise) -> Result<ProtocolMessage, ProtocolError> {
let ser = self let ser = self
.noise_cipher_state_dec .noise_cipher_state_dec

@ -41,14 +41,26 @@ pub trait IServerBroker: Send + Sync {
&self, &self,
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
user_id: &UserId,
) -> Result<RepoPinStatus, ServerError>; ) -> Result<RepoPinStatus, ServerError>;
fn pin_repo( fn pin_repo_write(
&self, &self,
overlay: &OverlayId, overlay: &OverlayAccess,
repo: &RepoHash, repo: &RepoHash,
user_id: &UserId,
ro_topics: &Vec<TopicId>, ro_topics: &Vec<TopicId>,
rw_topics: &Vec<PublisherAdvert>, rw_topics: &Vec<PublisherAdvert>,
overlay_root_topic: &Option<TopicId>,
expose_outer: bool,
) -> Result<RepoOpened, ServerError>;
fn pin_repo_read(
&self,
overlay: &OverlayId,
repo: &RepoHash,
user_id: &UserId,
ro_topics: &Vec<TopicId>,
) -> Result<RepoOpened, ServerError>; ) -> Result<RepoOpened, ServerError>;
fn topic_sub( fn topic_sub(
@ -56,6 +68,7 @@ pub trait IServerBroker: Send + Sync {
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
topic: &TopicId, topic: &TopicId,
user_id: &UserId,
publisher: Option<&PublisherAdvert>, publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError>; ) -> Result<TopicSubRes, ServerError>;

@ -20,8 +20,11 @@ use crate::{actor::EActor, actors::admin::*, actors::*};
use core::fmt; use core::fmt;
use ng_repo::errors::*; use ng_repo::errors::*;
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::store::Store;
use ng_repo::types::*; use ng_repo::types::*;
use ng_repo::utils::{sign, verify};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::{ use std::{
any::{Any, TypeId}, any::{Any, TypeId},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
@ -1213,6 +1216,38 @@ pub enum OverlayAccess {
} }
impl OverlayAccess { impl OverlayAccess {
pub fn is_read_only(&self) -> bool {
match self {
Self::ReadOnly(_) => true,
_ => false,
}
}
pub fn new_write_access_from_store(store: &Store) -> OverlayAccess {
match store.get_store_repo() {
StoreRepo::V0(StoreRepoV0::PrivateStore(_)) | StoreRepo::V0(StoreRepoV0::Dialog(_)) => {
OverlayAccess::WriteOnly(store.inner_overlay())
}
StoreRepo::V0(StoreRepoV0::ProtectedStore(_))
| StoreRepo::V0(StoreRepoV0::Group(_))
| StoreRepo::V0(StoreRepoV0::PublicStore(_)) => {
OverlayAccess::ReadWrite((store.inner_overlay(), store.outer_overlay()))
}
}
}
pub fn new_read_access_from_store(store: &Store) -> OverlayAccess {
match store.get_store_repo() {
StoreRepo::V0(StoreRepoV0::PrivateStore(_)) | StoreRepo::V0(StoreRepoV0::Dialog(_)) => {
panic!("cannot get read access to a private or dialog store");
}
StoreRepo::V0(StoreRepoV0::ProtectedStore(_))
| StoreRepo::V0(StoreRepoV0::Group(_))
| StoreRepo::V0(StoreRepoV0::PublicStore(_)) => {
OverlayAccess::ReadOnly(store.outer_overlay())
}
}
}
pub fn new_ro(outer_overlay: OverlayId) -> Result<Self, NgError> { pub fn new_ro(outer_overlay: OverlayId) -> Result<Self, NgError> {
if let OverlayId::Outer(_digest) = outer_overlay { if let OverlayId::Outer(_digest) = outer_overlay {
Ok(OverlayAccess::ReadOnly(outer_overlay)) Ok(OverlayAccess::ReadOnly(outer_overlay))
@ -1479,8 +1514,6 @@ pub enum PublisherAdvert {
V0(PublisherAdvertV0), V0(PublisherAdvertV0),
} }
use ng_repo::utils::sign;
impl PublisherAdvert { impl PublisherAdvert {
pub fn new( pub fn new(
topic_id: TopicId, topic_id: TopicId,
@ -1500,6 +1533,27 @@ impl PublisherAdvert {
Self::V0(v0) => &v0.content.topic, Self::V0(v0) => &v0.content.topic,
} }
} }
pub fn verify(&self) -> Result<(), NgError> {
match self {
Self::V0(v0) => verify(
&serde_bare::to_vec(&v0.content).unwrap(),
v0.sig,
v0.content.topic,
),
}
}
pub fn verify_for_broker(&self, peer_id: &DirectPeerId) -> Result<(), ProtocolError> {
match self {
Self::V0(v0) => {
if v0.content.peer != *peer_id {
return Err(ProtocolError::InvalidPublisherAdvert);
}
}
}
Ok(self.verify()?)
}
} }
/// Topic subscription request by a peer /// Topic subscription request by a peer
@ -2684,6 +2738,23 @@ impl PinRepo {
PinRepo::V0(o) => &o.overlay.overlay_id_for_client_protocol_purpose(), PinRepo::V0(o) => &o.overlay.overlay_id_for_client_protocol_purpose(),
} }
} }
pub fn overlay_access(&self) -> &OverlayAccess {
match self {
PinRepo::V0(o) => &o.overlay,
}
}
pub fn overlay_root_topic(&self) -> &Option<TopicId> {
match self {
PinRepo::V0(o) => &o.overlay_root_topic,
}
}
pub fn expose_outer(&self) -> bool {
match self {
PinRepo::V0(o) => o.expose_outer,
}
}
} }
/// Request to refresh the Pinning of a previously pinned repo. /// Request to refresh the Pinning of a previously pinned repo.
@ -3244,6 +3315,13 @@ impl TopicSubRes {
Self::V0(v0) => v0.publisher, Self::V0(v0) => v0.publisher,
} }
} }
pub fn new_from_heads(topics: HashSet<ObjectId>, publisher: bool, topic: TopicId) -> Self {
TopicSubRes::V0(TopicSubResV0 {
topic,
known_heads: topics.into_iter().collect(),
publisher,
})
}
} }
impl From<TopicId> for TopicSubRes { impl From<TopicId> for TopicSubRes {

@ -215,6 +215,28 @@ pub enum ServerError {
RepoAlreadyOpened, RepoAlreadyOpened,
NotFound, NotFound,
EmptyStream, EmptyStream,
StorageError,
InvalidRequest,
InvalidSignature,
OtherError,
}
impl From<StorageError> for ServerError {
fn from(e: StorageError) -> Self {
match e {
StorageError::NotFound => ServerError::NotFound,
_ => ServerError::StorageError,
}
}
}
impl From<NgError> for ServerError {
fn from(e: NgError) -> Self {
match e {
NgError::InvalidSignature => ServerError::InvalidSignature,
_ => ServerError::OtherError,
}
}
} }
impl ServerError { impl ServerError {
@ -318,7 +340,6 @@ pub enum ProtocolError {
WsError, WsError,
ActorError, ActorError,
InvalidState, InvalidState,
SignatureError,
InvalidSignature, InvalidSignature,
SerializationError, SerializationError,
AccessDenied, AccessDenied,
@ -330,6 +351,7 @@ pub enum ProtocolError {
InvalidValue, InvalidValue,
AlreadyExists, AlreadyExists,
RepoIdRequired, RepoIdRequired,
InvalidPublisherAdvert,
ConnectionError, ConnectionError,
Timeout, Timeout,

@ -229,6 +229,22 @@ impl<
let key = Self::compute_key(model, column)?; let key = Self::compute_key(model, column)?;
model.storage().put(self.prefix, &key, None, &vec![], &None) model.storage().put(self.prefix, &key, None, &vec![], &None)
} }
pub fn add_lazy(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?;
let key = Self::compute_key(model, column)?;
model.storage().write_transaction(&mut |tx| {
match tx.has_property_value(self.prefix, &key, None, &vec![], &None) {
Ok(_) => {}
Err(StorageError::NotFound) => {
tx.put(self.prefix, &key, None, &vec![], &None)?;
}
Err(e) => return Err(e),
};
Ok(())
})
}
pub fn remove(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> { pub fn remove(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?; model.check_exists()?;
let key = Self::compute_key(model, column)?; let key = Self::compute_key(model, column)?;
@ -281,7 +297,7 @@ impl<
pub struct MultiMapColumn< pub struct MultiMapColumn<
Model: IModel, Model: IModel,
Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>, Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>,
Value: Serialize + for<'a> Deserialize<'a>, Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq,
> { > {
prefix: u8, prefix: u8,
phantom_column: PhantomData<Column>, phantom_column: PhantomData<Column>,
@ -293,7 +309,7 @@ pub struct MultiMapColumn<
impl< impl<
Model: IModel, Model: IModel,
Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>,
Value: Serialize + for<'a> Deserialize<'a>, Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq,
> MultiMapColumn<Model, Column, Value> > MultiMapColumn<Model, Column, Value>
{ {
pub const fn new(prefix: u8) -> Self { pub const fn new(prefix: u8) -> Self {
@ -351,14 +367,60 @@ impl<
.has_property_value(self.prefix, &key, None, &to_vec(value)?, &None) .has_property_value(self.prefix, &key, None, &to_vec(value)?, &None)
} }
pub fn has_regardless_value( pub fn get(&self, model: &mut Model, column: &Column) -> Result<Value, StorageError> {
model.check_exists()?;
let key = MultiValueColumn::compute_key(model, column)?;
let val_ser = model.storage().get(self.prefix, &key, None, &None)?;
Ok(from_slice(&val_ser)?)
}
pub fn get_or_add(
&self,
model: &mut Model,
column: &Column,
value: &Value,
) -> Result<Value, StorageError> {
model.check_exists()?;
let key = MultiValueColumn::compute_key(model, column)?;
let mut found: Option<Value> = None;
model.storage().write_transaction(&mut |tx| {
found = match tx.get(self.prefix, &key, None, &None) {
Ok(val_ser) => Some(from_slice(&val_ser)?),
Err(StorageError::NotFound) => {
tx.put(self.prefix, &key, None, &to_vec(value)?, &None)?;
None
}
Err(e) => return Err(e),
};
Ok(())
})?;
Ok(found.unwrap_or(value.clone()))
}
pub fn add_or_change(
&self, &self,
model: &mut Model, model: &mut Model,
column: &Column, column: &Column,
value: &Value,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
model.check_exists()?; model.check_exists()?;
let key = MultiValueColumn::compute_key(model, column)?; let key = MultiValueColumn::compute_key(model, column)?;
model.storage().get(self.prefix, &key, None, &None)?; let mut found: Option<Value> = None;
model.storage().write_transaction(&mut |tx| {
found = match tx.get(self.prefix, &key, None, &None) {
Ok(val_ser) => Some(from_slice(&val_ser)?),
Err(StorageError::NotFound) => {
tx.put(self.prefix, &key, None, &to_vec(value)?, &None)?;
None
}
Err(e) => return Err(e),
};
if found.is_some() && found.as_ref().unwrap() != value {
// we change it
tx.put(self.prefix, &key, None, &to_vec(value)?, &None)?;
}
Ok(())
})?;
Ok(()) Ok(())
} }
@ -387,7 +449,7 @@ impl<
impl< impl<
Model: IModel, Model: IModel,
Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>, Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>,
Value: Serialize + for<'a> Deserialize<'a>, Value: Serialize + for<'a> Deserialize<'a> + Clone + PartialEq,
> IMultiValueColumn for MultiMapColumn<Model, Column, Value> > IMultiValueColumn for MultiMapColumn<Model, Column, Value>
{ {
fn value_size(&self) -> Result<usize, StorageError> { fn value_size(&self) -> Result<usize, StorageError> {
@ -526,7 +588,7 @@ pub struct SingleValueColumn<Model: IModel, Value: Serialize + for<'a> Deseriali
phantom_model: PhantomData<Model>, phantom_model: PhantomData<Model>,
} }
impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> ISingleValueColumn impl<Model: IModel, Value: Clone + Serialize + for<'d> Deserialize<'d>> ISingleValueColumn
for SingleValueColumn<Model, Value> for SingleValueColumn<Model, Value>
{ {
fn suffix(&self) -> u8 { fn suffix(&self) -> u8 {
@ -534,7 +596,9 @@ impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> ISingleValueColu
} }
} }
impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> SingleValueColumn<Model, Value> { impl<Model: IModel, Value: Clone + Serialize + for<'d> Deserialize<'d>>
SingleValueColumn<Model, Value>
{
pub const fn new(suffix: u8) -> Self { pub const fn new(suffix: u8) -> Self {
SingleValueColumn { SingleValueColumn {
suffix, suffix,
@ -553,6 +617,7 @@ impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> SingleValueColum
&None, &None,
) )
} }
pub fn get(&self, model: &mut Model) -> Result<Value, StorageError> { pub fn get(&self, model: &mut Model) -> Result<Value, StorageError> {
model.check_exists()?; model.check_exists()?;
match model match model
@ -564,6 +629,29 @@ impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> SingleValueColum
} }
} }
pub fn get_or_set(&self, model: &mut Model, value: &Value) -> Result<Value, StorageError> {
model.check_exists()?;
let mut found: Option<Value> = None;
model.storage().write_transaction(&mut |tx| {
found = match tx.get(model.prefix(), model.key(), Some(self.suffix), &None) {
Ok(val_ser) => Some(from_slice(&val_ser)?),
Err(StorageError::NotFound) => {
tx.put(
model.prefix(),
model.key(),
Some(self.suffix),
&to_vec(value)?,
&None,
)?;
None
}
Err(e) => return Err(e),
};
Ok(())
})?;
Ok(found.unwrap_or(value.clone()))
}
pub fn has(&self, model: &mut Model, value: &Value) -> Result<(), StorageError> { pub fn has(&self, model: &mut Model, value: &Value) -> Result<(), StorageError> {
model.check_exists()?; model.check_exists()?;
model.storage().has_property_value( model.storage().has_property_value(
@ -575,14 +663,12 @@ impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> SingleValueColum
) )
} }
// should call the Model::del() instead pub fn del(&self, model: &mut Model) -> Result<(), StorageError> {
// pub fn del( model.check_exists()?;
// &self, model
// model: &mut Model, .storage()
// tx: &mut dyn WriteTransaction, .del(model.prefix(), model.key(), Some(self.suffix), &None)
// ) -> Result<(), StorageError> { }
// tx.del(model.prefix(), model.key(), Some(self.suffix), &None)
// }
} }
pub struct ExistentialValue<Column: Serialize + for<'d> Deserialize<'d>> { pub struct ExistentialValue<Column: Serialize + for<'d> Deserialize<'d>> {

Loading…
Cancel
Save