From 3b90cb786cc881863b9c8475c7cf690f1bffc6d1 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Mon, 11 Sep 2023 12:50:25 +0200 Subject: [PATCH] Da service network (#384) * Make da backend async * Added remove blob * Added send_blob method to network adapter trait * Added libp2p backend Implemented blob stream * Implement attestation stream * Implement send methods --- nomos-services/data-availability/Cargo.toml | 5 + .../src/network/adapters/libp2p.rs | 113 ++++++++++++++++++ .../src/network/adapters/mod.rs | 2 + .../data-availability/src/network/mod.rs | 12 +- 4 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 nomos-services/data-availability/src/network/adapters/libp2p.rs create mode 100644 nomos-services/data-availability/src/network/adapters/mod.rs diff --git a/nomos-services/data-availability/Cargo.toml b/nomos-services/data-availability/Cargo.toml index 2a60bbc4..b1cfcbfd 100644 --- a/nomos-services/data-availability/Cargo.toml +++ b/nomos-services/data-availability/Cargo.toml @@ -12,5 +12,10 @@ moka = { version = "0.11", features = ["future"] } nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../network" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +serde = "1.0" tracing = "0.1" tokio = { version = "1", features = ["sync", "macros"] } +tokio-stream = "0.1" + +[features] +libp2p = ["nomos-network/nomos-libp2p"] \ No newline at end of file diff --git a/nomos-services/data-availability/src/network/adapters/libp2p.rs b/nomos-services/data-availability/src/network/adapters/libp2p.rs new file mode 100644 index 00000000..0755acfb --- /dev/null +++ b/nomos-services/data-availability/src/network/adapters/libp2p.rs @@ -0,0 +1,113 @@ +// std +use futures::Stream; +use overwatch_rs::DynError; +use std::marker::PhantomData; +// crates + +// internal +use crate::network::NetworkAdapter; +use nomos_core::wire; +use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; +use nomos_network::{NetworkMsg, NetworkService}; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use serde::de::DeserializeOwned; +use serde::Serialize; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; +use tracing::log::error; + +pub const NOMOS_DA_TOPIC: &str = "NomosDa"; + +pub struct Libp2pAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, + _blob: PhantomData, + _attestation: PhantomData, +} + +impl Libp2pAdapter +where + B: Serialize + DeserializeOwned + Send + Sync + 'static, + A: Serialize + DeserializeOwned + Send + Sync + 'static, +{ + async fn stream_for(&self) -> Box + Unpin + Send> { + let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC); + let (sender, receiver) = tokio::sync::oneshot::channel(); + self.network_relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + .expect("Network backend should be ready"); + let receiver = receiver.await.unwrap(); + Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( + move |msg| match msg { + Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => { + match wire::deserialize::(&data) { + Ok(msg) => Some(msg), + Err(e) => { + error!("Unrecognized Blob message: {e}"); + None + } + } + } + _ => None, + }, + ))) + } + + async fn send(&self, data: E) -> Result<(), DynError> { + let message = wire::serialize(&data)?.into_boxed_slice(); + self.network_relay + .send(NetworkMsg::Process(Command::Broadcast { + topic: NOMOS_DA_TOPIC.to_string(), + message, + })) + .await + .map_err(|(e, _)| Box::new(e) as DynError) + } +} + +#[async_trait::async_trait] +impl NetworkAdapter for Libp2pAdapter +where + B: Serialize + DeserializeOwned + Send + Sync + 'static, + A: Serialize + DeserializeOwned + Send + Sync + 'static, +{ + type Backend = Libp2p; + type Blob = B; + type Attestation = A; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + network_relay + .send(NetworkMsg::Process(Command::Subscribe( + NOMOS_DA_TOPIC.to_string(), + ))) + .await + .expect("Network backend should be ready"); + Self { + network_relay, + _blob: Default::default(), + _attestation: Default::default(), + } + } + + async fn blob_stream(&self) -> Box + Unpin + Send> { + self.stream_for::().await + } + + async fn attestation_stream(&self) -> Box + Unpin + Send> { + self.stream_for::().await + } + + async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> { + self.send(attestation).await + } + + async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError> { + self.send(blob).await + } +} diff --git a/nomos-services/data-availability/src/network/adapters/mod.rs b/nomos-services/data-availability/src/network/adapters/mod.rs new file mode 100644 index 00000000..a22ade97 --- /dev/null +++ b/nomos-services/data-availability/src/network/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "libp2p")] +pub mod libp2p; diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs index 38dd1f2f..d6b783d6 100644 --- a/nomos-services/data-availability/src/network/mod.rs +++ b/nomos-services/data-availability/src/network/mod.rs @@ -1,3 +1,5 @@ +mod adapters; + // std // crates use futures::Stream; @@ -7,13 +9,15 @@ use nomos_network::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; use overwatch_rs::DynError; +use serde::de::DeserializeOwned; +use serde::Serialize; #[async_trait::async_trait] pub trait NetworkAdapter { type Backend: NetworkBackend + 'static; - type Blob: Send + Sync + 'static; - type Attestation: Send + Sync + 'static; + type Blob: Serialize + DeserializeOwned + Send + Sync + 'static; + type Attestation: Serialize + DeserializeOwned + Send + Sync + 'static; async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, @@ -21,5 +25,9 @@ pub trait NetworkAdapter { async fn blob_stream(&self) -> Box + Unpin + Send>; + async fn attestation_stream(&self) -> Box + Unpin + Send>; + async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>; + + async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError>; }