From 0e49e06f5611618dabe91f5ad86f72403e07da5d Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Tue, 10 Oct 2023 16:45:14 +0200 Subject: [PATCH] Pipe lifecycle in da service --- nomos-services/data-availability/src/lib.rs | 131 ++++++++++++-------- 1 file changed, 81 insertions(+), 50 deletions(-) diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index b0ebc255..d423ad45 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -14,9 +14,11 @@ use crate::network::NetworkAdapter; use nomos_core::da::{blob::Blob, DaProtocol}; use nomos_network::NetworkService; use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use tracing::error; pub struct DataAvailabilityService where @@ -70,6 +72,76 @@ where type Message = DaMsg; } +impl DataAvailabilityService +where + Protocol: DaProtocol + Send + Sync, + Backend: DaBackend + Send + Sync, + Protocol::Settings: Clone + Send + Sync + 'static, + Protocol::Blob: 'static, + Backend::Settings: Clone + Send + Sync + 'static, + Protocol::Blob: Send, + Protocol::Attestation: Send, + ::Hash: Debug + Send + Sync, + Network: + NetworkAdapter + Send + Sync, +{ + async fn handle_new_blob( + da: &Protocol, + backend: &Backend, + adapter: &Network, + blob: Protocol::Blob, + ) -> Result<(), DaError> { + // we need to handle the reply (verification + signature) + let attestation = da.attest(&blob); + backend.add_blob(blob).await?; + // we do not call `da.recv_blob` here because that is meant to + // be called to retrieve the original data, while here we're only interested + // in storing the blob. + // We might want to refactor the backend to be part of implementations of the + // Da protocol instead of this service and clear this confusion. + adapter + .send_attestation(attestation) + .await + .map_err(DaError::Dyn) + } + + async fn handle_da_msg(backend: &Backend, msg: DaMsg) -> Result<(), DaError> { + match msg { + DaMsg::RemoveBlobs { blobs } => { + futures::stream::iter(blobs) + .for_each_concurrent(None, |blob| async move { + if let Err(e) = backend.remove_blob(&blob).await { + tracing::debug!("Could not remove blob {blob:?} due to: {e:?}"); + } + }) + .await; + } + DaMsg::Get { ids, reply_channel } => { + let res = ids.filter_map(|id| backend.get_blob(&id)).collect(); + if reply_channel.send(res).is_err() { + tracing::error!("Could not returns blobs"); + } + } + } + Ok(()) + } + + async fn handle_lifecycle_message(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } +} + #[async_trait::async_trait] impl ServiceCore for DataAvailabilityService where @@ -111,71 +183,30 @@ where let adapter = Network::new(network_relay).await; let mut network_blobs = adapter.blob_stream().await; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); loop { tokio::select! { Some(blob) = network_blobs.next() => { - if let Err(e) = handle_new_blob(&da, &backend, &adapter, blob).await { + if let Err(e) = Self::handle_new_blob(&da, &backend, &adapter, blob).await { tracing::debug!("Failed to add a new received blob: {e:?}"); } } Some(msg) = service_state.inbound_relay.recv() => { - if let Err(e) = handle_da_msg(&backend, msg).await { + if let Err(e) = Self::handle_da_msg(&backend, msg).await { tracing::debug!("Failed to handle da msg: {e:?}"); } } + Some(msg) = lifecycle_stream.next() => { + if Self::handle_lifecycle_message(msg).await { + break; + } + } } } + Ok(()) } } -async fn handle_new_blob< - Protocol: DaProtocol, - Backend: DaBackend, - A: NetworkAdapter, ->( - da: &Protocol, - backend: &Backend, - adapter: &A, - blob: Protocol::Blob, -) -> Result<(), DaError> { - // we need to handle the reply (verification + signature) - let attestation = da.attest(&blob); - backend.add_blob(blob).await?; - // we do not call `da.recv_blob` here because that is meant to - // be called to retrieve the original data, while here we're only interested - // in storing the blob. - // We might want to refactor the backend to be part of implementations of the - // Da protocol instead of this service and clear this confusion. - adapter - .send_attestation(attestation) - .await - .map_err(DaError::Dyn) -} - -async fn handle_da_msg(backend: &B, msg: DaMsg) -> Result<(), DaError> -where - ::Hash: Debug, -{ - match msg { - DaMsg::RemoveBlobs { blobs } => { - futures::stream::iter(blobs) - .for_each_concurrent(None, |blob| async move { - if let Err(e) = backend.remove_blob(&blob).await { - tracing::debug!("Could not remove blob {blob:?} due to: {e:?}"); - } - }) - .await; - } - DaMsg::Get { ids, reply_channel } => { - let res = ids.filter_map(|id| backend.get_blob(&id)).collect(); - if reply_channel.send(res).is_err() { - tracing::error!("Could not returns blobs"); - } - } - } - Ok(()) -} - #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct Settings { pub da_protocol: P,