parent
fe4a41b3c2
commit
c8b7a04ab4
@ -0,0 +1,102 @@ |
||||
/* |
||||
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers |
||||
* All rights reserved. |
||||
* Licensed under the Apache License, Version 2.0 |
||||
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
* at your option. All files in the project carrying such |
||||
* notice may not be copied, modified, or distributed except |
||||
* according to those terms. |
||||
*/ |
||||
use crate::broker::{ServerConfig, BROKER}; |
||||
use crate::connection::NoiseFSM; |
||||
use crate::types::*; |
||||
use crate::{actor::*, types::ProtocolMessage}; |
||||
use async_std::sync::Mutex; |
||||
use ng_repo::errors::*; |
||||
use ng_repo::log::*; |
||||
use ng_repo::types::PubKey; |
||||
use serde::{Deserialize, Serialize}; |
||||
use std::sync::Arc; |
||||
|
||||
impl BlocksExist { |
||||
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> { |
||||
Actor::<BlocksExist, BlocksFound>::new_responder(id) |
||||
} |
||||
} |
||||
|
||||
impl TryFrom<ProtocolMessage> for BlocksExist { |
||||
type Error = ProtocolError; |
||||
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> { |
||||
let req: ClientRequestContentV0 = msg.try_into()?; |
||||
if let ClientRequestContentV0::BlocksExist(a) = req { |
||||
Ok(a) |
||||
} else { |
||||
log_debug!("INVALID {:?}", req); |
||||
Err(ProtocolError::InvalidValue) |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl From<BlocksExist> for ProtocolMessage { |
||||
fn from(msg: BlocksExist) -> ProtocolMessage { |
||||
let overlay = *msg.overlay(); |
||||
ProtocolMessage::from_client_request_v0(ClientRequestContentV0::BlocksExist(msg), overlay) |
||||
} |
||||
} |
||||
|
||||
impl TryFrom<ProtocolMessage> for BlocksFound { |
||||
type Error = ProtocolError; |
||||
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> { |
||||
let res: ClientResponseContentV0 = msg.try_into()?; |
||||
if let ClientResponseContentV0::BlocksFound(a) = res { |
||||
Ok(a) |
||||
} else { |
||||
log_debug!("INVALID {:?}", res); |
||||
Err(ProtocolError::InvalidValue) |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl From<BlocksFound> for ProtocolMessage { |
||||
fn from(b: BlocksFound) -> ProtocolMessage { |
||||
ClientResponseContentV0::BlocksFound(b).into() |
||||
} |
||||
} |
||||
|
||||
impl Actor<'_, BlocksExist, BlocksFound> {} |
||||
|
||||
#[async_trait::async_trait] |
||||
impl EActor for Actor<'_, BlocksExist, BlocksFound> { |
||||
async fn respond( |
||||
&mut self, |
||||
msg: ProtocolMessage, |
||||
fsm: Arc<Mutex<NoiseFSM>>, |
||||
) -> Result<(), ProtocolError> { |
||||
let req = BlocksExist::try_from(msg)?; |
||||
let broker = BROKER.read().await; |
||||
|
||||
let overlay = req.overlay().clone(); |
||||
let mut found = vec![]; |
||||
let mut missing = vec![]; |
||||
match req { |
||||
BlocksExist::V0(v0) => { |
||||
for block_id in v0.blocks { |
||||
let r = broker.get_server_broker()?.has_block(&overlay, &block_id); |
||||
if r.is_err() { |
||||
missing.push(block_id); |
||||
} else { |
||||
found.push(block_id); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
let res = Ok(BlocksFound::V0(BlocksFoundV0 { found, missing })); |
||||
|
||||
fsm.lock() |
||||
.await |
||||
.send_in_reply_to(res.into(), self.id()) |
||||
.await?; |
||||
Ok(()) |
||||
} |
||||
} |
@ -0,0 +1,129 @@ |
||||
/* |
||||
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers |
||||
* All rights reserved. |
||||
* Licensed under the Apache License, Version 2.0 |
||||
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
* at your option. All files in the project carrying such |
||||
* notice may not be copied, modified, or distributed except |
||||
* according to those terms. |
||||
*/ |
||||
use crate::broker::{ServerConfig, BROKER}; |
||||
use crate::connection::NoiseFSM; |
||||
use crate::server_broker::IServerBroker; |
||||
use crate::types::*; |
||||
use crate::{actor::*, types::ProtocolMessage}; |
||||
use async_recursion::async_recursion; |
||||
use async_std::sync::{Mutex, MutexGuard}; |
||||
use ng_repo::errors::*; |
||||
use ng_repo::log::*; |
||||
use ng_repo::types::{Block, BlockId, OverlayId, PubKey}; |
||||
use serde::{Deserialize, Serialize}; |
||||
use std::sync::Arc; |
||||
|
||||
impl BlocksGet { |
||||
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> { |
||||
Actor::<BlocksGet, Block>::new_responder(id) |
||||
} |
||||
|
||||
pub fn overlay(&self) -> &OverlayId { |
||||
match self { |
||||
Self::V0(v0) => v0.overlay.as_ref().unwrap(), |
||||
} |
||||
} |
||||
pub fn set_overlay(&mut self, overlay: OverlayId) { |
||||
match self { |
||||
Self::V0(v0) => v0.overlay = Some(overlay), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl TryFrom<ProtocolMessage> for BlocksGet { |
||||
type Error = ProtocolError; |
||||
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> { |
||||
let req: ClientRequestContentV0 = msg.try_into()?; |
||||
if let ClientRequestContentV0::BlocksGet(a) = req { |
||||
Ok(a) |
||||
} else { |
||||
log_debug!("INVALID {:?}", req); |
||||
Err(ProtocolError::InvalidValue) |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl From<BlocksGet> for ProtocolMessage { |
||||
fn from(msg: BlocksGet) -> ProtocolMessage { |
||||
let overlay = *msg.overlay(); |
||||
ProtocolMessage::from_client_request_v0(ClientRequestContentV0::BlocksGet(msg), overlay) |
||||
} |
||||
} |
||||
|
||||
impl Actor<'_, BlocksGet, Block> {} |
||||
|
||||
#[async_trait::async_trait] |
||||
impl EActor for Actor<'_, BlocksGet, Block> { |
||||
async fn respond( |
||||
&mut self, |
||||
msg: ProtocolMessage, |
||||
fsm: Arc<Mutex<NoiseFSM>>, |
||||
) -> Result<(), ProtocolError> { |
||||
let req = BlocksGet::try_from(msg)?; |
||||
let broker = BROKER.read().await; |
||||
let server = broker.get_server_broker()?; |
||||
let mut lock = fsm.lock().await; |
||||
let mut something_was_sent = false; |
||||
|
||||
#[async_recursion] |
||||
async fn process_children( |
||||
children: &Vec<BlockId>, |
||||
server: &Box<dyn IServerBroker + Send + Sync>, |
||||
overlay: &OverlayId, |
||||
lock: &mut MutexGuard<'_, NoiseFSM>, |
||||
req_id: i64, |
||||
include_children: bool, |
||||
something_was_sent: &mut bool, |
||||
) { |
||||
for block_id in children { |
||||
if let Ok(block) = server.get_block(overlay, block_id) { |
||||
let grand_children = block.children().to_vec(); |
||||
if let Err(_) = lock.send_in_reply_to(block.into(), req_id).await { |
||||
break; |
||||
} |
||||
*something_was_sent = true; |
||||
if include_children { |
||||
process_children( |
||||
&grand_children, |
||||
server, |
||||
overlay, |
||||
lock, |
||||
req_id, |
||||
include_children, |
||||
something_was_sent, |
||||
) |
||||
.await; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
process_children( |
||||
req.ids(), |
||||
server, |
||||
req.overlay(), |
||||
&mut lock, |
||||
self.id(), |
||||
req.include_children(), |
||||
&mut something_was_sent, |
||||
) |
||||
.await; |
||||
|
||||
if !something_was_sent { |
||||
let re: Result<(), ServerError> = Err(ServerError::NotFound); |
||||
lock.send_in_reply_to(re.into(), self.id()).await?; |
||||
} else { |
||||
let re: Result<(), ServerError> = Err(ServerError::EndOfStream); |
||||
lock.send_in_reply_to(re.into(), self.id()).await?; |
||||
} |
||||
|
||||
Ok(()) |
||||
} |
||||
} |
@ -0,0 +1,79 @@ |
||||
/* |
||||
* Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers |
||||
* All rights reserved. |
||||
* Licensed under the Apache License, Version 2.0 |
||||
* <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
* or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
* at your option. All files in the project carrying such |
||||
* notice may not be copied, modified, or distributed except |
||||
* according to those terms. |
||||
*/ |
||||
use crate::broker::{ServerConfig, BROKER}; |
||||
use crate::connection::NoiseFSM; |
||||
use crate::types::*; |
||||
use crate::{actor::*, types::ProtocolMessage}; |
||||
use async_std::sync::Mutex; |
||||
use ng_repo::errors::*; |
||||
use ng_repo::log::*; |
||||
use ng_repo::types::PubKey; |
||||
use serde::{Deserialize, Serialize}; |
||||
use std::sync::Arc; |
||||
|
||||
impl BlocksPut { |
||||
pub fn get_actor(&self, id: i64) -> Box<dyn EActor> { |
||||
Actor::<BlocksPut, ()>::new_responder(id) |
||||
} |
||||
} |
||||
|
||||
impl TryFrom<ProtocolMessage> for BlocksPut { |
||||
type Error = ProtocolError; |
||||
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> { |
||||
let req: ClientRequestContentV0 = msg.try_into()?; |
||||
if let ClientRequestContentV0::BlocksPut(a) = req { |
||||
Ok(a) |
||||
} else { |
||||
log_debug!("INVALID {:?}", req); |
||||
Err(ProtocolError::InvalidValue) |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl From<BlocksPut> for ProtocolMessage { |
||||
fn from(msg: BlocksPut) -> ProtocolMessage { |
||||
let overlay = *msg.overlay(); |
||||
ProtocolMessage::from_client_request_v0(ClientRequestContentV0::BlocksPut(msg), overlay) |
||||
} |
||||
} |
||||
|
||||
impl Actor<'_, BlocksPut, ()> {} |
||||
|
||||
#[async_trait::async_trait] |
||||
impl EActor for Actor<'_, BlocksPut, ()> { |
||||
async fn respond( |
||||
&mut self, |
||||
msg: ProtocolMessage, |
||||
fsm: Arc<Mutex<NoiseFSM>>, |
||||
) -> Result<(), ProtocolError> { |
||||
let req = BlocksPut::try_from(msg)?; |
||||
let broker = BROKER.read().await; |
||||
let mut res: Result<(), ServerError> = Ok(()); |
||||
let overlay = req.overlay().clone(); |
||||
match req { |
||||
BlocksPut::V0(v0) => { |
||||
for block in v0.blocks { |
||||
let r = broker.get_server_broker()?.put_block(&overlay, block); |
||||
if r.is_err() { |
||||
res = r; |
||||
break; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
fsm.lock() |
||||
.await |
||||
.send_in_reply_to(res.into(), self.id()) |
||||
.await?; |
||||
Ok(()) |
||||
} |
||||
} |
@ -0,0 +1,220 @@ |
||||
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
|
||||
// All rights reserved.
|
||||
// Licensed under the Apache License, Version 2.0
|
||||
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
|
||||
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
|
||||
// at your option. All files in the project carrying such
|
||||
// notice may not be copied, modified, or distributed except
|
||||
// according to those terms.
|
||||
|
||||
//! Processor for each type of AppRequest
|
||||
|
||||
use futures::channel::mpsc; |
||||
use futures::SinkExt; |
||||
use futures::StreamExt; |
||||
use ng_net::utils::ResultSend; |
||||
use std::sync::Arc; |
||||
|
||||
use crate::types::*; |
||||
|
||||
use crate::verifier::*; |
||||
|
||||
use ng_net::utils::{spawn_and_log_error, Receiver, Sender}; |
||||
|
||||
use ng_repo::errors::*; |
||||
use ng_repo::file::{RandomAccessFile, ReadFile}; |
||||
use ng_repo::types::BranchId; |
||||
use ng_repo::types::*; |
||||
|
||||
use ng_repo::log::*; |
||||
use ng_repo::types::StoreRepo; |
||||
|
||||
impl AppRequestCommandV0 { |
||||
pub(crate) async fn process_stream( |
||||
&self, |
||||
verifier: &mut Verifier, |
||||
nuri: &NuriV0, |
||||
payload: &Option<AppRequestPayload>, |
||||
) -> Result<(Receiver<AppResponse>, CancelFn), NgError> { |
||||
match self { |
||||
Self::Fetch(fetch) => match fetch { |
||||
AppFetchContentV0::Subscribe => { |
||||
let (_, branch_id, _) = |
||||
Self::open_for_target(verifier, &nuri.target, false).await?; |
||||
Ok(verifier.create_branch_subscription(branch_id).await?) |
||||
} |
||||
_ => unimplemented!(), |
||||
}, |
||||
Self::FileGet => { |
||||
if nuri.access.len() < 1 || nuri.object.is_none() { |
||||
return Err(NgError::InvalidArgument); |
||||
} |
||||
let (repo_id, _, store_repo) = Self::resolve_target(verifier, &nuri.target)?; |
||||
let access = nuri.access.get(0).unwrap(); |
||||
if let NgAccessV0::Key(key) = access { |
||||
let repo = verifier.get_repo(&repo_id, &store_repo)?; |
||||
let obj_id = nuri.object.unwrap(); |
||||
if let Some(mut stream) = verifier |
||||
.fetch_blocks_if_needed(&obj_id, &repo_id, &store_repo) |
||||
.await? |
||||
{ |
||||
// TODO: start opening the file and running the sending_loop after we received 10 (3 mandatory and 7 depths max) blocks.
|
||||
// for files below 10MB we wont see a difference, but for big files, we can start sending out some AppResponse earlier.
|
||||
while let Some(block) = stream.next().await { |
||||
repo.store.put(&block)?; |
||||
} |
||||
} |
||||
let file = |
||||
RandomAccessFile::open(obj_id, key.clone(), Arc::clone(&repo.store))?; |
||||
|
||||
let (mut tx, rx) = mpsc::unbounded::<AppResponse>(); |
||||
tx.send(AppResponse::V0(AppResponseV0::FileMeta(FileMetaV0 { |
||||
content_type: file.meta().content_type().clone(), |
||||
size: file.meta().total_size(), |
||||
}))) |
||||
.await |
||||
.map_err(|_| NgError::InternalError)?; |
||||
|
||||
async fn sending_loop( |
||||
file: Arc<RandomAccessFile>, |
||||
mut tx: Sender<AppResponse>, |
||||
) -> ResultSend<()> { |
||||
let mut pos = 0; |
||||
loop { |
||||
let res = file.read(pos, 1048564); |
||||
|
||||
if res.is_err() { |
||||
//log_info!("ERR={:?}", res.unwrap_err());
|
||||
let _ = tx |
||||
.send(AppResponse::V0(AppResponseV0::FileBinary(vec![]))) |
||||
.await; |
||||
break; |
||||
} |
||||
let res = res.unwrap(); |
||||
//log_info!("reading={} {}", pos, res.len());
|
||||
pos += res.len(); |
||||
if let Err(_) = tx |
||||
.send(AppResponse::V0(AppResponseV0::FileBinary(res))) |
||||
.await |
||||
{ |
||||
break; |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
spawn_and_log_error(sending_loop(Arc::new(file), tx.clone())); |
||||
let fnonce = Box::new(move || { |
||||
tx.close_channel(); |
||||
}); |
||||
Ok((rx, fnonce)) |
||||
} else { |
||||
return Err(NgError::InvalidArgument); |
||||
} |
||||
} |
||||
_ => unimplemented!(), |
||||
} |
||||
} |
||||
|
||||
fn resolve_target( |
||||
verifier: &mut Verifier, |
||||
target: &NuriTargetV0, |
||||
) -> Result<(RepoId, BranchId, StoreRepo), NgError> { |
||||
match target { |
||||
NuriTargetV0::PrivateStore => { |
||||
let repo_id = verifier.config.private_store_id.unwrap(); |
||||
let (branch, store_repo) = { |
||||
let repo = verifier.repos.get(&repo_id).ok_or(NgError::RepoNotFound)?; |
||||
let branch = repo.main_branch().ok_or(NgError::BranchNotFound)?; |
||||
(branch.id, repo.store.get_store_repo().clone()) |
||||
}; |
||||
Ok((repo_id, branch, store_repo)) |
||||
} |
||||
_ => unimplemented!(), |
||||
} |
||||
} |
||||
|
||||
async fn open_for_target( |
||||
verifier: &mut Verifier, |
||||
target: &NuriTargetV0, |
||||
as_publisher: bool, |
||||
) -> Result<(RepoId, BranchId, StoreRepo), NgError> { |
||||
let (repo_id, branch, store_repo) = Self::resolve_target(verifier, target)?; |
||||
verifier |
||||
.open_branch(&repo_id, &branch, as_publisher) |
||||
.await?; |
||||
Ok((repo_id, branch, store_repo)) |
||||
} |
||||
|
||||
pub(crate) async fn process( |
||||
&self, |
||||
verifier: &mut Verifier, |
||||
nuri: NuriV0, |
||||
payload: Option<AppRequestPayload>, |
||||
) -> Result<AppResponse, NgError> { |
||||
match self { |
||||
Self::FilePut => match payload { |
||||
None => return Err(NgError::InvalidPayload), |
||||
Some(AppRequestPayload::V0(v0)) => match v0 { |
||||
AppRequestPayloadV0::AddFile(add) => { |
||||
let (repo_id, branch, store_repo) = |
||||
Self::open_for_target(verifier, &nuri.target, true).await?; |
||||
//log_info!("GOT ADD FILE {:?}", add);
|
||||
let repo = verifier.get_repo(&repo_id, &store_repo)?; |
||||
// check that the referenced object exists locally.
|
||||
repo.store.has(&add.object.id)?; |
||||
// we send all the blocks to the broker.
|
||||
let file = RandomAccessFile::open( |
||||
add.object.id.clone(), |
||||
add.object.key.clone(), |
||||
Arc::clone(&repo.store), |
||||
)?; |
||||
let blocks = file.get_all_blocks_ids()?; |
||||
let found = verifier.has_blocks(blocks, repo).await?; |
||||
for block_id in found.missing() { |
||||
let block = repo.store.get(block_id)?; |
||||
verifier.put_blocks(vec![block], repo).await?; |
||||
} |
||||
|
||||
let add_file_commit_body = CommitBodyV0::AddFile(AddFile::V0(AddFileV0 { |
||||
name: add.filename, |
||||
metadata: vec![], |
||||
})); |
||||
|
||||
verifier |
||||
.new_commit( |
||||
add_file_commit_body, |
||||
&repo_id, |
||||
&branch, |
||||
&store_repo, |
||||
&vec![], |
||||
vec![], |
||||
vec![add.object], |
||||
) |
||||
.await?; |
||||
} |
||||
AppRequestPayloadV0::SmallFilePut(small) => {} |
||||
AppRequestPayloadV0::RandomAccessFilePut(content_type) => { |
||||
let (repo_id, _, store_repo) = |
||||
Self::resolve_target(verifier, &nuri.target)?; |
||||
let repo = verifier.get_repo(&repo_id, &store_repo)?; |
||||
let id = verifier.start_upload(content_type, Arc::clone(&repo.store)); |
||||
return Ok(AppResponse::V0(AppResponseV0::FileUploading(id))); |
||||
} |
||||
AppRequestPayloadV0::RandomAccessFilePutChunk((id, chunk)) => { |
||||
if chunk.len() > 0 { |
||||
verifier.continue_upload(id, &chunk)?; |
||||
} else { |
||||
let reference = verifier.finish_upload(id)?; |
||||
return Ok(AppResponse::V0(AppResponseV0::FileUploaded(reference))); |
||||
} |
||||
} |
||||
_ => return Err(NgError::InvalidPayload), |
||||
}, |
||||
}, |
||||
|
||||
_ => unimplemented!(), |
||||
} |
||||
Ok(AppResponse::V0(AppResponseV0::Ok)) |
||||
} |
||||
} |
Loading…
Reference in new issue