Do not exit the process during dissemination (#516)

* Do not exit the process during dissemination

This is now library code and we should not exit the process if an
error is encountered

* clippy happy
This commit is contained in:
Giacomo Pasini 2023-11-07 15:24:31 +01:00 committed by GitHub
parent 3028cdb5d1
commit 1b4c0638fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 7 deletions

View File

@ -1,5 +1,5 @@
use crate::da::disseminate::{ use crate::da::disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
}; };
use clap::Args; use clap::Args;
use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use nomos_network::{backends::libp2p::Libp2p, NetworkService};
@ -69,6 +69,10 @@ impl Disseminate {
drop(payload_sender); drop(payload_sender);
tracing::info!("{}", rx.recv().unwrap().display()); tracing::info!("{}", rx.recv().unwrap().display());
while let Ok(update) = rx.recv() { while let Ok(update) = rx.recv() {
if let Status::Err(e) = update {
tracing::error!("{e}");
return Err(e);
}
tracing::info!("{}", update.display()); tracing::info!("{}", update.display());
} }
tracing::info!("done"); tracing::info!("done");

View File

@ -33,7 +33,7 @@ pub async fn disseminate_and_wait<D, B, N, A, C>(
status_updates: Sender<Status>, status_updates: Sender<Status>,
node_addr: Option<&Url>, node_addr: Option<&Url>,
output: Option<&PathBuf>, output: Option<&PathBuf>,
) -> Result<(), Box<dyn std::error::Error>> ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where where
D: DaProtocol<Blob = B, Attestation = A, Certificate = C>, D: DaProtocol<Blob = B, Attestation = A, Certificate = C>,
N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync, N: NetworkAdapter<Blob = B, Attestation = A> + Send + Sync,
@ -47,7 +47,7 @@ where
status_updates.send(Status::Disseminating)?; status_updates.send(Status::Disseminating)?;
futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob))) futures::future::try_join_all(blobs.into_iter().map(|blob| adapter.send_blob(blob)))
.await .await
.map_err(|e| e as Box<dyn std::error::Error>)?; .map_err(|e| e as Box<dyn std::error::Error + Sync + Send>)?;
// 3) Collect attestations and create proof // 3) Collect attestations and create proof
status_updates.send(Status::WaitingAttestations)?; status_updates.send(Status::WaitingAttestations)?;
@ -70,7 +70,6 @@ where
status_updates.send(Status::SendingCert)?; status_updates.send(Status::SendingCert)?;
let res = send_certificate(node, &cert).await?; let res = send_certificate(node, &cert).await?;
tracing::info!("Response: {:?}", res);
if !res.status().is_success() { if !res.status().is_success() {
tracing::error!("ERROR: {:?}", res); tracing::error!("ERROR: {:?}", res);
return Err(format!("Failed to send certificate to node: {}", res.status()).into()); return Err(format!("Failed to send certificate to node: {}", res.status()).into());
@ -89,6 +88,7 @@ pub enum Status {
SavingCert, SavingCert,
SendingCert, SendingCert,
Done, Done,
Err(Box<dyn std::error::Error + Send + Sync>),
} }
impl Status { impl Status {
@ -101,6 +101,7 @@ impl Status {
Self::SavingCert => "Saving certificate to file", Self::SavingCert => "Saving certificate to file",
Self::SendingCert => "Sending certificate to node", Self::SendingCert => "Sending certificate to node",
Self::Done => "", Self::Done => "",
Self::Err(_) => "Error",
} }
} }
} }
@ -178,13 +179,13 @@ impl ServiceCore for DisseminateService {
{ {
Err(_) => { Err(_) => {
tracing::error!("Timeout reached, check the logs for additional details"); tracing::error!("Timeout reached, check the logs for additional details");
std::process::exit(1); let _ = status_updates.send(Status::Err("Timeout reached".into()));
} }
Ok(Err(_)) => { Ok(Err(e)) => {
tracing::error!( tracing::error!(
"Could not disseminate blob, check logs for additional details" "Could not disseminate blob, check logs for additional details"
); );
std::process::exit(1); let _ = status_updates.send(Status::Err(e));
} }
_ => {} _ => {}
} }