Pipe lifecycle in da service
This commit is contained in:
parent
8be76f5381
commit
0e49e06f56
|
@ -14,9 +14,11 @@ use crate::network::NetworkAdapter;
|
||||||
use nomos_core::da::{blob::Blob, DaProtocol};
|
use nomos_core::da::{blob::Blob, DaProtocol};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||||
use overwatch_rs::services::state::{NoOperator, NoState};
|
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||||
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
pub struct DataAvailabilityService<Protocol, Backend, Network>
|
pub struct DataAvailabilityService<Protocol, Backend, Network>
|
||||||
where
|
where
|
||||||
|
@ -70,6 +72,76 @@ where
|
||||||
type Message = DaMsg<Protocol::Blob>;
|
type Message = DaMsg<Protocol::Blob>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Protocol, Backend, Network> DataAvailabilityService<Protocol, Backend, Network>
|
||||||
|
where
|
||||||
|
Protocol: DaProtocol + Send + Sync,
|
||||||
|
Backend: DaBackend<Blob = Protocol::Blob> + Send + Sync,
|
||||||
|
Protocol::Settings: Clone + Send + Sync + 'static,
|
||||||
|
Protocol::Blob: 'static,
|
||||||
|
Backend::Settings: Clone + Send + Sync + 'static,
|
||||||
|
Protocol::Blob: Send,
|
||||||
|
Protocol::Attestation: Send,
|
||||||
|
<Backend::Blob as Blob>::Hash: Debug + Send + Sync,
|
||||||
|
Network:
|
||||||
|
NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation> + Send + Sync,
|
||||||
|
{
|
||||||
|
async fn handle_new_blob(
|
||||||
|
da: &Protocol,
|
||||||
|
backend: &Backend,
|
||||||
|
adapter: &Network,
|
||||||
|
blob: Protocol::Blob,
|
||||||
|
) -> Result<(), DaError> {
|
||||||
|
// we need to handle the reply (verification + signature)
|
||||||
|
let attestation = da.attest(&blob);
|
||||||
|
backend.add_blob(blob).await?;
|
||||||
|
// we do not call `da.recv_blob` here because that is meant to
|
||||||
|
// be called to retrieve the original data, while here we're only interested
|
||||||
|
// in storing the blob.
|
||||||
|
// We might want to refactor the backend to be part of implementations of the
|
||||||
|
// Da protocol instead of this service and clear this confusion.
|
||||||
|
adapter
|
||||||
|
.send_attestation(attestation)
|
||||||
|
.await
|
||||||
|
.map_err(DaError::Dyn)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_da_msg(backend: &Backend, msg: DaMsg<Backend::Blob>) -> Result<(), DaError> {
|
||||||
|
match msg {
|
||||||
|
DaMsg::RemoveBlobs { blobs } => {
|
||||||
|
futures::stream::iter(blobs)
|
||||||
|
.for_each_concurrent(None, |blob| async move {
|
||||||
|
if let Err(e) = backend.remove_blob(&blob).await {
|
||||||
|
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
DaMsg::Get { ids, reply_channel } => {
|
||||||
|
let res = ids.filter_map(|id| backend.get_blob(&id)).collect();
|
||||||
|
if reply_channel.send(res).is_err() {
|
||||||
|
tracing::error!("Could not returns blobs");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_lifecycle_message(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<Protocol, Backend, Network> ServiceCore for DataAvailabilityService<Protocol, Backend, Network>
|
impl<Protocol, Backend, Network> ServiceCore for DataAvailabilityService<Protocol, Backend, Network>
|
||||||
where
|
where
|
||||||
|
@ -111,71 +183,30 @@ where
|
||||||
|
|
||||||
let adapter = Network::new(network_relay).await;
|
let adapter = Network::new(network_relay).await;
|
||||||
let mut network_blobs = adapter.blob_stream().await;
|
let mut network_blobs = adapter.blob_stream().await;
|
||||||
|
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(blob) = network_blobs.next() => {
|
Some(blob) = network_blobs.next() => {
|
||||||
if let Err(e) = handle_new_blob(&da, &backend, &adapter, blob).await {
|
if let Err(e) = Self::handle_new_blob(&da, &backend, &adapter, blob).await {
|
||||||
tracing::debug!("Failed to add a new received blob: {e:?}");
|
tracing::debug!("Failed to add a new received blob: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(msg) = service_state.inbound_relay.recv() => {
|
Some(msg) = service_state.inbound_relay.recv() => {
|
||||||
if let Err(e) = handle_da_msg(&backend, msg).await {
|
if let Err(e) = Self::handle_da_msg(&backend, msg).await {
|
||||||
tracing::debug!("Failed to handle da msg: {e:?}");
|
tracing::debug!("Failed to handle da msg: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::handle_lifecycle_message(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_blob<
|
|
||||||
Protocol: DaProtocol,
|
|
||||||
Backend: DaBackend<Blob = Protocol::Blob>,
|
|
||||||
A: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
|
|
||||||
>(
|
|
||||||
da: &Protocol,
|
|
||||||
backend: &Backend,
|
|
||||||
adapter: &A,
|
|
||||||
blob: Protocol::Blob,
|
|
||||||
) -> Result<(), DaError> {
|
|
||||||
// we need to handle the reply (verification + signature)
|
|
||||||
let attestation = da.attest(&blob);
|
|
||||||
backend.add_blob(blob).await?;
|
|
||||||
// we do not call `da.recv_blob` here because that is meant to
|
|
||||||
// be called to retrieve the original data, while here we're only interested
|
|
||||||
// in storing the blob.
|
|
||||||
// We might want to refactor the backend to be part of implementations of the
|
|
||||||
// Da protocol instead of this service and clear this confusion.
|
|
||||||
adapter
|
|
||||||
.send_attestation(attestation)
|
|
||||||
.await
|
|
||||||
.map_err(DaError::Dyn)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_da_msg<B: DaBackend>(backend: &B, msg: DaMsg<B::Blob>) -> Result<(), DaError>
|
|
||||||
where
|
|
||||||
<B::Blob as Blob>::Hash: Debug,
|
|
||||||
{
|
|
||||||
match msg {
|
|
||||||
DaMsg::RemoveBlobs { blobs } => {
|
|
||||||
futures::stream::iter(blobs)
|
|
||||||
.for_each_concurrent(None, |blob| async move {
|
|
||||||
if let Err(e) = backend.remove_blob(&blob).await {
|
|
||||||
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
DaMsg::Get { ids, reply_channel } => {
|
|
||||||
let res = ids.filter_map(|id| backend.get_blob(&id)).collect();
|
|
||||||
if reply_channel.send(res).is_err() {
|
|
||||||
tracing::error!("Could not returns blobs");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
pub struct Settings<P, B> {
|
pub struct Settings<P, B> {
|
||||||
pub da_protocol: P,
|
pub da_protocol: P,
|
||||||
|
|
Loading…
Reference in New Issue