Da service network (#384)
* Make da backend async * Added remove blob * Added send_blob method to network adapter trait * Added libp2p backend Implemented blob stream * Implement attestation stream * Implement send methods
This commit is contained in:
parent
96e3c2d499
commit
3b90cb786c
@ -12,5 +12,10 @@ moka = { version = "0.11", features = ["future"] }
|
|||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
nomos-network = { path = "../network" }
|
nomos-network = { path = "../network" }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
|
serde = "1.0"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tokio = { version = "1", features = ["sync", "macros"] }
|
tokio = { version = "1", features = ["sync", "macros"] }
|
||||||
|
tokio-stream = "0.1"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
libp2p = ["nomos-network/nomos-libp2p"]
|
113
nomos-services/data-availability/src/network/adapters/libp2p.rs
Normal file
113
nomos-services/data-availability/src/network/adapters/libp2p.rs
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
// std
|
||||||
|
use futures::Stream;
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
// crates
|
||||||
|
|
||||||
|
// internal
|
||||||
|
use crate::network::NetworkAdapter;
|
||||||
|
use nomos_core::wire;
|
||||||
|
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
|
||||||
|
use nomos_network::{NetworkMsg, NetworkService};
|
||||||
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
|
use overwatch_rs::services::ServiceData;
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::Serialize;
|
||||||
|
use tokio_stream::wrappers::BroadcastStream;
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
use tracing::log::error;
|
||||||
|
|
||||||
|
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
|
||||||
|
|
||||||
|
pub struct Libp2pAdapter<B, A> {
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||||
|
_blob: PhantomData<B>,
|
||||||
|
_attestation: PhantomData<A>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B, A> Libp2pAdapter<B, A>
|
||||||
|
where
|
||||||
|
B: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||||
|
A: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
async fn stream_for<E: DeserializeOwned>(&self) -> Box<dyn Stream<Item = E> + Unpin + Send> {
|
||||||
|
let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC);
|
||||||
|
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||||
|
self.network_relay
|
||||||
|
.send(NetworkMsg::Subscribe {
|
||||||
|
kind: EventKind::Message,
|
||||||
|
sender,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("Network backend should be ready");
|
||||||
|
let receiver = receiver.await.unwrap();
|
||||||
|
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
|
||||||
|
move |msg| match msg {
|
||||||
|
Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => {
|
||||||
|
match wire::deserialize::<E>(&data) {
|
||||||
|
Ok(msg) => Some(msg),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Unrecognized Blob message: {e}");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send<E: Serialize>(&self, data: E) -> Result<(), DynError> {
|
||||||
|
let message = wire::serialize(&data)?.into_boxed_slice();
|
||||||
|
self.network_relay
|
||||||
|
.send(NetworkMsg::Process(Command::Broadcast {
|
||||||
|
topic: NOMOS_DA_TOPIC.to_string(),
|
||||||
|
message,
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.map_err(|(e, _)| Box::new(e) as DynError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<B, A> NetworkAdapter for Libp2pAdapter<B, A>
|
||||||
|
where
|
||||||
|
B: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||||
|
A: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
type Backend = Libp2p;
|
||||||
|
type Blob = B;
|
||||||
|
type Attestation = A;
|
||||||
|
|
||||||
|
async fn new(
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||||
|
) -> Self {
|
||||||
|
network_relay
|
||||||
|
.send(NetworkMsg::Process(Command::Subscribe(
|
||||||
|
NOMOS_DA_TOPIC.to_string(),
|
||||||
|
)))
|
||||||
|
.await
|
||||||
|
.expect("Network backend should be ready");
|
||||||
|
Self {
|
||||||
|
network_relay,
|
||||||
|
_blob: Default::default(),
|
||||||
|
_attestation: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send> {
|
||||||
|
self.stream_for::<Self::Blob>().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn attestation_stream(&self) -> Box<dyn Stream<Item = Self::Attestation> + Unpin + Send> {
|
||||||
|
self.stream_for::<Self::Attestation>().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> {
|
||||||
|
self.send(attestation).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError> {
|
||||||
|
self.send(blob).await
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,2 @@
|
|||||||
|
#[cfg(feature = "libp2p")]
|
||||||
|
pub mod libp2p;
|
@ -1,3 +1,5 @@
|
|||||||
|
mod adapters;
|
||||||
|
|
||||||
// std
|
// std
|
||||||
// crates
|
// crates
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
@ -7,13 +9,15 @@ use nomos_network::NetworkService;
|
|||||||
use overwatch_rs::services::relay::OutboundRelay;
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
use overwatch_rs::services::ServiceData;
|
use overwatch_rs::services::ServiceData;
|
||||||
use overwatch_rs::DynError;
|
use overwatch_rs::DynError;
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait NetworkAdapter {
|
pub trait NetworkAdapter {
|
||||||
type Backend: NetworkBackend + 'static;
|
type Backend: NetworkBackend + 'static;
|
||||||
|
|
||||||
type Blob: Send + Sync + 'static;
|
type Blob: Serialize + DeserializeOwned + Send + Sync + 'static;
|
||||||
type Attestation: Send + Sync + 'static;
|
type Attestation: Serialize + DeserializeOwned + Send + Sync + 'static;
|
||||||
|
|
||||||
async fn new(
|
async fn new(
|
||||||
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||||
@ -21,5 +25,9 @@ pub trait NetworkAdapter {
|
|||||||
|
|
||||||
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
|
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
|
||||||
|
|
||||||
|
async fn attestation_stream(&self) -> Box<dyn Stream<Item = Self::Attestation> + Unpin + Send>;
|
||||||
|
|
||||||
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;
|
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;
|
||||||
|
|
||||||
|
async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError>;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user