Initial DA service sketch (#376)
* Add basic da module and traits * Pipe new blobs and internal message handling * Add and pipe send attestation method
This commit is contained in:
parent
57746f5e76
commit
bca27bd27f
|
@ -9,6 +9,7 @@ members = [
|
||||||
"nomos-services/consensus",
|
"nomos-services/consensus",
|
||||||
"nomos-services/mempool",
|
"nomos-services/mempool",
|
||||||
"nomos-services/http",
|
"nomos-services/http",
|
||||||
|
"nomos-services/data-availability",
|
||||||
"nomos-da-core/reed-solomon",
|
"nomos-da-core/reed-solomon",
|
||||||
"nomos-da-core/kzg",
|
"nomos-da-core/kzg",
|
||||||
"nodes/nomos-node",
|
"nodes/nomos-node",
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
[package]
|
||||||
|
name = "nomos-da"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-trait = "0.1"
|
||||||
|
futures = "0.3"
|
||||||
|
nomos-network = { path = "../network" }
|
||||||
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
|
tracing = "0.1"
|
||||||
|
tokio = { version = "1", features = ["sync", "macros"] }
|
|
@ -0,0 +1,18 @@
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DaError {
|
||||||
|
Dyn(DynError),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait DaBackend {
|
||||||
|
type Settings: Clone;
|
||||||
|
|
||||||
|
type Blob;
|
||||||
|
|
||||||
|
fn new(settings: Self::Settings) -> Self;
|
||||||
|
|
||||||
|
fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>;
|
||||||
|
|
||||||
|
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
|
||||||
|
}
|
|
@ -0,0 +1,135 @@
|
||||||
|
mod backend;
|
||||||
|
mod network;
|
||||||
|
|
||||||
|
// std
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
use std::fmt::{Debug, Formatter};
|
||||||
|
// crates
|
||||||
|
use futures::StreamExt;
|
||||||
|
use tokio::sync::oneshot::Sender;
|
||||||
|
// internal
|
||||||
|
use crate::backend::{DaBackend, DaError};
|
||||||
|
use crate::network::NetworkAdapter;
|
||||||
|
use nomos_network::NetworkService;
|
||||||
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
|
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||||
|
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||||
|
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||||
|
|
||||||
|
pub struct DataAvailabilityService<B, N>
|
||||||
|
where
|
||||||
|
B: DaBackend,
|
||||||
|
B::Blob: 'static,
|
||||||
|
N: NetworkAdapter<Blob = B::Blob>,
|
||||||
|
{
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
backend: B,
|
||||||
|
network_relay: Relay<NetworkService<N::Backend>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum DaMsg<Blob> {
|
||||||
|
PendingBlobs {
|
||||||
|
reply_channel: Sender<Box<dyn Iterator<Item = Blob> + Send>>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Blob: 'static> Debug for DaMsg<Blob> {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
DaMsg::PendingBlobs { .. } => {
|
||||||
|
write!(f, "DaMsg::PendingBlobs")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Blob: 'static> RelayMessage for DaMsg<Blob> {}
|
||||||
|
|
||||||
|
impl<B, N> ServiceData for DataAvailabilityService<B, N>
|
||||||
|
where
|
||||||
|
B: DaBackend,
|
||||||
|
B::Blob: 'static,
|
||||||
|
N: NetworkAdapter<Blob = B::Blob>,
|
||||||
|
{
|
||||||
|
const SERVICE_ID: ServiceId = "DA";
|
||||||
|
type Settings = B::Settings;
|
||||||
|
type State = NoState<Self::Settings>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = DaMsg<B::Blob>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<B, N> ServiceCore for DataAvailabilityService<B, N>
|
||||||
|
where
|
||||||
|
B: DaBackend + Send,
|
||||||
|
B::Settings: Clone + Send + Sync + 'static,
|
||||||
|
B::Blob: Send,
|
||||||
|
// TODO: Reply type must be piped together, for now empty array.
|
||||||
|
N: NetworkAdapter<Blob = B::Blob, Reply = [u8; 32]> + Send + Sync,
|
||||||
|
{
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||||
|
let network_relay = service_state.overwatch_handle.relay();
|
||||||
|
let backend_settings = service_state.settings_reader.get_updated_settings();
|
||||||
|
let backend = B::new(backend_settings);
|
||||||
|
Ok(Self {
|
||||||
|
service_state,
|
||||||
|
backend,
|
||||||
|
network_relay,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) -> Result<(), DynError> {
|
||||||
|
let Self {
|
||||||
|
mut service_state,
|
||||||
|
mut backend,
|
||||||
|
network_relay,
|
||||||
|
} = self;
|
||||||
|
|
||||||
|
let network_relay = network_relay
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.expect("Relay connection with NetworkService should succeed");
|
||||||
|
|
||||||
|
let adapter = N::new(network_relay).await;
|
||||||
|
let mut network_blobs = adapter.blob_stream().await;
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(blob) = network_blobs.next() => {
|
||||||
|
if let Err(e) = handle_new_blob(&mut backend, &adapter, blob).await {
|
||||||
|
tracing::debug!("Failed to add a new received blob: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(msg) = service_state.inbound_relay.recv() => {
|
||||||
|
if let Err(e) = handle_da_msg(&mut backend, msg).await {
|
||||||
|
tracing::debug!("Failed to handle da msg: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_new_blob<B: DaBackend, A: NetworkAdapter<Blob = B::Blob, Reply = [u8; 32]>>(
|
||||||
|
backend: &mut B,
|
||||||
|
adapter: &A,
|
||||||
|
blob: B::Blob,
|
||||||
|
) -> Result<(), DaError> {
|
||||||
|
// we need to handle the reply (verification + signature)
|
||||||
|
backend.add_blob(blob)?;
|
||||||
|
adapter
|
||||||
|
.send_attestation([0u8; 32])
|
||||||
|
.await
|
||||||
|
.map_err(DaError::Dyn)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError> {
|
||||||
|
match msg {
|
||||||
|
DaMsg::PendingBlobs { reply_channel } => {
|
||||||
|
let pending_blobs = backend.pending_blobs();
|
||||||
|
if reply_channel.send(pending_blobs).is_err() {
|
||||||
|
tracing::debug!("Could not send pending blobs");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -0,0 +1,25 @@
|
||||||
|
// std
|
||||||
|
// crates
|
||||||
|
use futures::Stream;
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
// internal
|
||||||
|
use nomos_network::backends::NetworkBackend;
|
||||||
|
use nomos_network::NetworkService;
|
||||||
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
|
use overwatch_rs::services::ServiceData;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait NetworkAdapter {
|
||||||
|
type Backend: NetworkBackend + 'static;
|
||||||
|
|
||||||
|
type Blob: Send + Sync + 'static;
|
||||||
|
type Reply: Send + Sync + 'static;
|
||||||
|
|
||||||
|
async fn new(
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||||
|
) -> Self;
|
||||||
|
|
||||||
|
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
|
||||||
|
|
||||||
|
async fn send_attestation(&self, attestation: Self::Reply) -> Result<(), DynError>;
|
||||||
|
}
|
Loading…
Reference in New Issue