diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs index 9ea7c496..f8a6759d 100644 --- a/nomos-services/cryptarchia-consensus/src/lib.rs +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -421,7 +421,7 @@ where let tx_selector = TxS::new(transaction_selector_settings); let blob_selector = BS::new(blob_selector_settings); - let mut incoming_blocks = adapter.blocks_stream().await; + let mut incoming_blocks = adapter.blocks_stream().await?; let mut leader = leadership::Leader::new(genesis_id, notes, config); let timer = time::Timer::new(time); diff --git a/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs b/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs index 306f3c39..1f6f6140 100644 --- a/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs +++ b/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs @@ -1,8 +1,9 @@ // std +use overwatch_rs::DynError; use std::hash::Hash; +use std::marker::PhantomData; // crates use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::broadcast::error::RecvError; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; // internal use crate::network::{messages::NetworkMessage, BoxedStream, NetworkAdapter}; @@ -12,9 +13,9 @@ use nomos_network::{ NetworkMsg, NetworkService, }; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; const TOPIC: &str = "/cryptarchia/proto"; -const BUFFER_SIZE: usize = 64; type Relay = OutboundRelay< as ServiceData>::Message>; #[derive(Clone)] @@ -24,7 +25,8 @@ where BlobCert: Clone + Eq + Hash, { network_relay: OutboundRelay< as ServiceData>::Message>, - blocks: tokio::sync::broadcast::Sender>, + _phantom_tx: PhantomData, + _blob_cert: PhantomData, } impl LibP2pAdapter @@ -45,8 +47,8 @@ where #[async_trait::async_trait] impl NetworkAdapter for LibP2pAdapter where - Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static, - BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static, + Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, + BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, { type Backend = Libp2p; type Tx = Tx; @@ -55,57 +57,54 @@ where async fn new(network_relay: Relay) -> Self { let relay = network_relay.clone(); Self::subscribe(&relay, TOPIC).await; - let blocks = tokio::sync::broadcast::Sender::new(BUFFER_SIZE); - let blocks_sender = blocks.clone(); tracing::debug!("Starting up..."); // this wait seems to be helpful in some cases since we give the time // to the network to establish connections before we start sending messages tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // TODO: maybe we need the runtime handle here? - tokio::spawn(async move { - let (sender, receiver) = tokio::sync::oneshot::channel(); - if let Err((e, _)) = relay - .send(NetworkMsg::Subscribe { - kind: EventKind::Message, - sender, - }) - .await - { - tracing::error!("error subscribing to incoming messages: {e}"); - } - - let mut incoming_messages = receiver.await.unwrap(); - loop { - match incoming_messages.recv().await { - Ok(Event::Message(message)) => { - match nomos_core::wire::deserialize(&message.data) { - Ok(msg) => match msg { - NetworkMessage::Block(block) => { - tracing::debug!("received block {:?}", block.header().id()); - if let Err(err) = blocks_sender.send(block) { - tracing::error!("error sending block to consensus: {err}"); - } - } - }, - _ => tracing::debug!("unrecognized gossipsub message"), - } - } - Err(RecvError::Lagged(n)) => { - tracing::error!("lagged messages: {n}") - } - Err(RecvError::Closed) => unreachable!(), - } - } - }); Self { network_relay, - blocks, + _phantom_tx: Default::default(), + _blob_cert: Default::default(), } } - async fn blocks_stream(&self) -> BoxedStream> { - Box::new(BroadcastStream::new(self.blocks.subscribe()).filter_map(Result::ok)) + async fn blocks_stream( + &self, + ) -> Result>, DynError> { + let (sender, receiver) = tokio::sync::oneshot::channel(); + if let Err((e, _)) = self + .network_relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + { + return Err(Box::new(e)); + } + Ok(Box::new( + BroadcastStream::new(receiver.await.map_err(Box::new)?).filter_map(|message| { + match message { + Ok(Event::Message(message)) => match wire::deserialize(&message.data) { + Ok(msg) => match msg { + NetworkMessage::Block(block) => { + tracing::debug!("received block {:?}", block.header().id()); + Some(block) + } + }, + _ => { + tracing::debug!("unrecognized gossipsub message"); + None + } + }, + Err(BroadcastStreamRecvError::Lagged(n)) => { + tracing::error!("lagged messages: {n}"); + None + } + } + }), + )) } async fn broadcast(&self, message: NetworkMessage) { diff --git a/nomos-services/cryptarchia-consensus/src/network/mod.rs b/nomos-services/cryptarchia-consensus/src/network/mod.rs index 409669bc..e1a3bf6d 100644 --- a/nomos-services/cryptarchia-consensus/src/network/mod.rs +++ b/nomos-services/cryptarchia-consensus/src/network/mod.rs @@ -6,6 +6,7 @@ use std::hash::Hash; // crates use futures::Stream; use nomos_core::block::Block; +use overwatch_rs::DynError; // internal use crate::network::messages::NetworkMessage; use nomos_network::backends::NetworkBackend; @@ -24,6 +25,8 @@ pub trait NetworkAdapter { async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self; - async fn blocks_stream(&self) -> BoxedStream>; + async fn blocks_stream( + &self, + ) -> Result>, DynError>; async fn broadcast(&self, message: NetworkMessage); }