|
|
@ -154,11 +154,11 @@ impl Verifier { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub(crate) async fn push_app_response(&mut self, branch: &BranchId, response: AppResponse) { |
|
|
|
pub(crate) async fn push_app_response(&mut self, branch: &BranchId, response: AppResponse) { |
|
|
|
log_info!( |
|
|
|
// log_info!(
|
|
|
|
"push_app_response {} {:?}", |
|
|
|
// "push_app_response {} {:?}",
|
|
|
|
branch, |
|
|
|
// branch,
|
|
|
|
self.branch_subscriptions |
|
|
|
// self.branch_subscriptions
|
|
|
|
); |
|
|
|
// );
|
|
|
|
if let Some(sender) = self.branch_subscriptions.get_mut(branch) { |
|
|
|
if let Some(sender) = self.branch_subscriptions.get_mut(branch) { |
|
|
|
if sender.is_closed() { |
|
|
|
if sender.is_closed() { |
|
|
|
log_info!("closed so removed"); |
|
|
|
log_info!("closed so removed"); |
|
|
@ -172,7 +172,6 @@ impl Verifier { |
|
|
|
pub(crate) async fn create_branch_subscription( |
|
|
|
pub(crate) async fn create_branch_subscription( |
|
|
|
&mut self, |
|
|
|
&mut self, |
|
|
|
branch: BranchId, |
|
|
|
branch: BranchId, |
|
|
|
resub: bool, |
|
|
|
|
|
|
|
) -> Result<(Receiver<AppResponse>, CancelFn), VerifierError> { |
|
|
|
) -> Result<(Receiver<AppResponse>, CancelFn), VerifierError> { |
|
|
|
// async fn send(mut tx: Sender<AppResponse>, msg: AppResponse) -> ResultSend<()> {
|
|
|
|
// async fn send(mut tx: Sender<AppResponse>, msg: AppResponse) -> ResultSend<()> {
|
|
|
|
// while let Ok(_) = tx.send(msg.clone()).await {
|
|
|
|
// while let Ok(_) = tx.send(msg.clone()).await {
|
|
|
@ -185,26 +184,25 @@ impl Verifier { |
|
|
|
// spawn_and_log_error(send(tx.clone(), commit));
|
|
|
|
// spawn_and_log_error(send(tx.clone(), commit));
|
|
|
|
//log_info!("#### create_branch_subscription {}", branch);
|
|
|
|
//log_info!("#### create_branch_subscription {}", branch);
|
|
|
|
let (tx, rx) = mpsc::unbounded::<AppResponse>(); |
|
|
|
let (tx, rx) = mpsc::unbounded::<AppResponse>(); |
|
|
|
log_info!("SUBSCRIBE"); |
|
|
|
//log_info!("SUBSCRIBE");
|
|
|
|
if let Some(returned) = self.branch_subscriptions.insert(branch, tx.clone()) { |
|
|
|
if let Some(returned) = self.branch_subscriptions.insert(branch, tx.clone()) { |
|
|
|
log_info!("RESUBSCRIBE"); |
|
|
|
//log_info!("RESUBSCRIBE");
|
|
|
|
if !returned.is_closed() { |
|
|
|
if !returned.is_closed() { |
|
|
|
log_info!("FORCE CLOSE"); |
|
|
|
//log_info!("FORCE CLOSE");
|
|
|
|
returned.close_channel(); |
|
|
|
returned.close_channel(); |
|
|
|
//return Err(VerifierError::DoubleBranchSubscription);
|
|
|
|
//return Err(VerifierError::DoubleBranchSubscription);
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if !resub { |
|
|
|
|
|
|
|
//let tx = self.branch_subscriptions.entry(branch).or_insert_with(|| {});
|
|
|
|
//let tx = self.branch_subscriptions.entry(branch).or_insert_with(|| {});
|
|
|
|
for file in self |
|
|
|
for file in self |
|
|
|
.user_storage |
|
|
|
.user_storage |
|
|
|
.as_ref() |
|
|
|
.as_ref() |
|
|
|
.unwrap() |
|
|
|
.unwrap() |
|
|
|
.branch_get_all_files(&branch)? |
|
|
|
.branch_get_all_files(&branch)? |
|
|
|
{ |
|
|
|
{ |
|
|
|
self.push_app_response(&branch, AppResponse::V0(AppResponseV0::File(file))) |
|
|
|
self.push_app_response(&branch, AppResponse::V0(AppResponseV0::File(file))) |
|
|
|
.await; |
|
|
|
.await; |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
let fnonce = Box::new(move || { |
|
|
|
let fnonce = Box::new(move || { |
|
|
|