From bca27bd27fde5eeaf620021ca833a6049494386e Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 6 Sep 2023 15:11:15 +0200 Subject: [PATCH] Initial DA service sketch (#376) * Add basic da module and traits * Pipe new blobs and internal message handling * Add and pipe send attestation method --- Cargo.toml | 1 + nomos-services/data-availability/Cargo.toml | 14 ++ .../data-availability/src/backend/mod.rs | 18 +++ nomos-services/data-availability/src/lib.rs | 135 ++++++++++++++++++ .../data-availability/src/network/mod.rs | 25 ++++ 5 files changed, 193 insertions(+) create mode 100644 nomos-services/data-availability/Cargo.toml create mode 100644 nomos-services/data-availability/src/backend/mod.rs create mode 100644 nomos-services/data-availability/src/lib.rs create mode 100644 nomos-services/data-availability/src/network/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 9f476b4e..816dc9cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "nomos-services/consensus", "nomos-services/mempool", "nomos-services/http", + "nomos-services/data-availability", "nomos-da-core/reed-solomon", "nomos-da-core/kzg", "nodes/nomos-node", diff --git a/nomos-services/data-availability/Cargo.toml b/nomos-services/data-availability/Cargo.toml new file mode 100644 index 00000000..a4816c73 --- /dev/null +++ b/nomos-services/data-availability/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "nomos-da" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +futures = "0.3" +nomos-network = { path = "../network" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +tracing = "0.1" +tokio = { version = "1", features = ["sync", "macros"] } diff --git a/nomos-services/data-availability/src/backend/mod.rs b/nomos-services/data-availability/src/backend/mod.rs new file mode 100644 index 00000000..52ee258a --- /dev/null +++ b/nomos-services/data-availability/src/backend/mod.rs @@ -0,0 +1,18 @@ +use overwatch_rs::DynError; + +#[derive(Debug)] +pub enum DaError { + Dyn(DynError), +} + +pub trait DaBackend { + type Settings: Clone; + + type Blob; + + fn new(settings: Self::Settings) -> Self; + + fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>; + + fn pending_blobs(&self) -> Box + Send>; +} diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs new file mode 100644 index 00000000..9101a5bf --- /dev/null +++ b/nomos-services/data-availability/src/lib.rs @@ -0,0 +1,135 @@ +mod backend; +mod network; + +// std +use overwatch_rs::DynError; +use std::fmt::{Debug, Formatter}; +// crates +use futures::StreamExt; +use tokio::sync::oneshot::Sender; +// internal +use crate::backend::{DaBackend, DaError}; +use crate::network::NetworkAdapter; +use nomos_network::NetworkService; +use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::relay::{Relay, RelayMessage}; +use overwatch_rs::services::state::{NoOperator, NoState}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; + +pub struct DataAvailabilityService +where + B: DaBackend, + B::Blob: 'static, + N: NetworkAdapter, +{ + service_state: ServiceStateHandle, + backend: B, + network_relay: Relay>, +} + +pub enum DaMsg { + PendingBlobs { + reply_channel: Sender + Send>>, + }, +} + +impl Debug for DaMsg { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DaMsg::PendingBlobs { .. } => { + write!(f, "DaMsg::PendingBlobs") + } + } + } +} + +impl RelayMessage for DaMsg {} + +impl ServiceData for DataAvailabilityService +where + B: DaBackend, + B::Blob: 'static, + N: NetworkAdapter, +{ + const SERVICE_ID: ServiceId = "DA"; + type Settings = B::Settings; + type State = NoState; + type StateOperator = NoOperator; + type Message = DaMsg; +} + +#[async_trait::async_trait] +impl ServiceCore for DataAvailabilityService +where + B: DaBackend + Send, + B::Settings: Clone + Send + Sync + 'static, + B::Blob: Send, + // TODO: Reply type must be piped together, for now empty array. + N: NetworkAdapter + Send + Sync, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let network_relay = service_state.overwatch_handle.relay(); + let backend_settings = service_state.settings_reader.get_updated_settings(); + let backend = B::new(backend_settings); + Ok(Self { + service_state, + backend, + network_relay, + }) + } + + async fn run(self) -> Result<(), DynError> { + let Self { + mut service_state, + mut backend, + network_relay, + } = self; + + let network_relay = network_relay + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + let adapter = N::new(network_relay).await; + let mut network_blobs = adapter.blob_stream().await; + loop { + tokio::select! { + Some(blob) = network_blobs.next() => { + if let Err(e) = handle_new_blob(&mut backend, &adapter, blob).await { + tracing::debug!("Failed to add a new received blob: {e:?}"); + } + } + Some(msg) = service_state.inbound_relay.recv() => { + if let Err(e) = handle_da_msg(&mut backend, msg).await { + tracing::debug!("Failed to handle da msg: {e:?}"); + } + } + } + } + } +} + +async fn handle_new_blob>( + backend: &mut B, + adapter: &A, + blob: B::Blob, +) -> Result<(), DaError> { + // we need to handle the reply (verification + signature) + backend.add_blob(blob)?; + adapter + .send_attestation([0u8; 32]) + .await + .map_err(DaError::Dyn) +} + +async fn handle_da_msg(backend: &mut B, msg: DaMsg) -> Result<(), DaError> { + match msg { + DaMsg::PendingBlobs { reply_channel } => { + let pending_blobs = backend.pending_blobs(); + if reply_channel.send(pending_blobs).is_err() { + tracing::debug!("Could not send pending blobs"); + } + } + } + Ok(()) +} diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs new file mode 100644 index 00000000..c09dcd31 --- /dev/null +++ b/nomos-services/data-availability/src/network/mod.rs @@ -0,0 +1,25 @@ +// std +// crates +use futures::Stream; +use overwatch_rs::DynError; +// internal +use nomos_network::backends::NetworkBackend; +use nomos_network::NetworkService; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; + +#[async_trait::async_trait] +pub trait NetworkAdapter { + type Backend: NetworkBackend + 'static; + + type Blob: Send + Sync + 'static; + type Reply: Send + Sync + 'static; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self; + + async fn blob_stream(&self) -> Box + Unpin + Send>; + + async fn send_attestation(&self, attestation: Self::Reply) -> Result<(), DynError>; +}