pull/19/head
Niko PLP 6 months ago
parent 1fa0eb0dc7
commit 776c88e3ae
  1. 58
      ng-broker/src/rocksdb_server_storage.rs
  2. 3
      ng-broker/src/server_broker.rs
  3. 2
      ng-net/src/actors/client/pin_repo.rs
  4. 8
      ng-net/src/actors/client/topic_sub.rs
  5. 3
      ng-repo/src/errors.rs
  6. 7
      ng-repo/src/types.rs

@ -278,6 +278,8 @@ impl RocksDbServerStorage {
) -> Result<RepoOpened, ServerError> { ) -> Result<RepoOpened, ServerError> {
assert!(!overlay_access.is_read_only()); assert!(!overlay_access.is_read_only());
// TODO: all the below DB operations should be done inside a single transaction. need refactor of Object-KCV-Mapping to take an optional transaction.
let inner_overlay = overlay_access.overlay_id_for_client_protocol_purpose(); let inner_overlay = overlay_access.overlay_id_for_client_protocol_purpose();
let mut inner_overlay_storage = let mut inner_overlay_storage =
match OverlayStorage::open(inner_overlay, &self.core_storage) { match OverlayStorage::open(inner_overlay, &self.core_storage) {
@ -403,13 +405,59 @@ impl RocksDbServerStorage {
overlay: &OverlayId, overlay: &OverlayId,
repo: &RepoHash, repo: &RepoHash,
topic: &TopicId, topic: &TopicId,
user_id: &UserId,
publisher: Option<&PublisherAdvert>, publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError> { ) -> Result<TopicSubRes, ServerError> {
Ok(TopicSubRes::V0(TopicSubResV0 { let mut overlay_storage =
topic: topic.clone(), OverlayStorage::open(overlay, &self.core_storage).map_err(|e| match e {
known_heads: vec![], StorageError::NotFound => ServerError::OverlayNotFound,
publisher: publisher.is_some(), _ => e.into(),
})) })?;
let overlay = match overlay_storage.overlay_type() {
OverlayType::Outer(_) => {
if overlay.is_outer() {
overlay
} else {
return Err(ServerError::OverlayMismatch);
}
}
OverlayType::Inner(outer) => {
if outer.is_outer() {
outer
} else {
return Err(ServerError::OverlayMismatch);
}
}
OverlayType::InnerOnly => {
if overlay.is_inner() {
overlay
} else {
return Err(ServerError::OverlayMismatch);
}
}
};
// now we check that the repo was previously pinned.
// if it was opened but not pinned, then this should be deal with in the ServerBroker, in memory, not here)
let is_publisher = publisher.is_some();
// (we already checked that the advert is valid)
let mut topic_storage =
TopicStorage::create(topic, overlay, repo, &self.core_storage, true)?;
let _ = TopicStorage::USERS.get_or_add(&mut topic_storage, user_id, &is_publisher)?;
if is_publisher {
let _ = TopicStorage::ADVERT.get_or_set(&mut topic_storage, publisher.unwrap())?;
}
let mut repo_info = RepoHashStorage::open(repo, overlay, &self.core_storage)?;
RepoHashStorage::TOPICS.add_lazy(&mut repo_info, topic)?;
Ok(TopicSubRes::new_from_heads(
TopicStorage::get_all_heads(&mut topic_storage)?,
is_publisher,
*topic,
))
} }
pub(crate) fn get_commit( pub(crate) fn get_commit(

@ -206,7 +206,8 @@ impl IServerBroker for ServerBroker {
user: &UserId, user: &UserId,
publisher: Option<&PublisherAdvert>, publisher: Option<&PublisherAdvert>,
) -> Result<TopicSubRes, ServerError> { ) -> Result<TopicSubRes, ServerError> {
self.storage.topic_sub(overlay, repo, topic, publisher) self.storage
.topic_sub(overlay, repo, topic, user, publisher)
} }
fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> { fn get_commit(&self, overlay: &OverlayId, id: &ObjectId) -> Result<Vec<Block>, ServerError> {

@ -108,7 +108,7 @@ impl EActor for Actor<'_, PinRepo, RepoOpened> {
let broker = BROKER.read().await; let broker = BROKER.read().await;
// check the validity of the PublisherAdvert(s) // check the validity of the PublisherAdvert(s). this will return a ProtocolError (will close the connection)
let server_peer_id = broker.get_config().unwrap().peer_id; let server_peer_id = broker.get_config().unwrap().peer_id;
for pub_ad in req.rw_topics() { for pub_ad in req.rw_topics() {
pub_ad.verify_for_broker(&server_peer_id)?; pub_ad.verify_for_broker(&server_peer_id)?;

@ -98,8 +98,14 @@ impl EActor for Actor<'_, TopicSub, TopicSubRes> {
) -> Result<(), ProtocolError> { ) -> Result<(), ProtocolError> {
let req = TopicSub::try_from(msg)?; let req = TopicSub::try_from(msg)?;
//TODO implement all the server side logic
let broker = BROKER.read().await; let broker = BROKER.read().await;
// check the validity of the PublisherAdvert. this will return a ProtocolError (will close the connection)
if let Some(advert) = req.publisher() {
let server_peer_id = broker.get_config().unwrap().peer_id;
advert.verify_for_broker(&server_peer_id)?;
}
let res = broker.get_server_broker()?.topic_sub( let res = broker.get_server_broker()?.topic_sub(
req.overlay(), req.overlay(),
req.hash(), req.hash(),

@ -219,6 +219,9 @@ pub enum ServerError {
InvalidRequest, InvalidRequest,
InvalidSignature, InvalidSignature,
OtherError, OtherError,
OverlayMismatch,
OverlayNotFound,
TopicNotFound,
} }
impl From<StorageError> for ServerError { impl From<StorageError> for ServerError {

@ -575,6 +575,13 @@ impl OverlayId {
_ => false, _ => false,
} }
} }
pub fn is_outer(&self) -> bool {
match self {
Self::Outer(_) => true,
_ => false,
}
}
} }
/// List of Store Overlay types /// List of Store Overlay types

Loading…
Cancel
Save