1
0
mirror of synced 2025-01-24 14:39:18 +00:00

Remove task from consensus network adapter (#828)

This commit is contained in:
Daniel Sanchez 2024-10-18 16:25:08 +02:00 committed by GitHub
parent 41a9387b4b
commit ba726bad84
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 50 additions and 48 deletions

View File

@ -421,7 +421,7 @@ where
let tx_selector = TxS::new(transaction_selector_settings); let tx_selector = TxS::new(transaction_selector_settings);
let blob_selector = BS::new(blob_selector_settings); let blob_selector = BS::new(blob_selector_settings);
let mut incoming_blocks = adapter.blocks_stream().await; let mut incoming_blocks = adapter.blocks_stream().await?;
let mut leader = leadership::Leader::new(genesis_id, notes, config); let mut leader = leadership::Leader::new(genesis_id, notes, config);
let timer = time::Timer::new(time); let timer = time::Timer::new(time);

View File

@ -1,8 +1,9 @@
// std // std
use overwatch_rs::DynError;
use std::hash::Hash; use std::hash::Hash;
use std::marker::PhantomData;
// crates // crates
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::broadcast::error::RecvError;
use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tokio_stream::{wrappers::BroadcastStream, StreamExt};
// internal // internal
use crate::network::{messages::NetworkMessage, BoxedStream, NetworkAdapter}; use crate::network::{messages::NetworkMessage, BoxedStream, NetworkAdapter};
@ -12,9 +13,9 @@ use nomos_network::{
NetworkMsg, NetworkService, NetworkMsg, NetworkService,
}; };
use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
const TOPIC: &str = "/cryptarchia/proto"; const TOPIC: &str = "/cryptarchia/proto";
const BUFFER_SIZE: usize = 64;
type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>; type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;
#[derive(Clone)] #[derive(Clone)]
@ -24,7 +25,8 @@ where
BlobCert: Clone + Eq + Hash, BlobCert: Clone + Eq + Hash,
{ {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
blocks: tokio::sync::broadcast::Sender<Block<Tx, BlobCert>>, _phantom_tx: PhantomData<Tx>,
_blob_cert: PhantomData<BlobCert>,
} }
impl<Tx, BlobCert> LibP2pAdapter<Tx, BlobCert> impl<Tx, BlobCert> LibP2pAdapter<Tx, BlobCert>
@ -45,8 +47,8 @@ where
#[async_trait::async_trait] #[async_trait::async_trait]
impl<Tx, BlobCert> NetworkAdapter for LibP2pAdapter<Tx, BlobCert> impl<Tx, BlobCert> NetworkAdapter for LibP2pAdapter<Tx, BlobCert>
where where
Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static, Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static,
BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static, BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static,
{ {
type Backend = Libp2p; type Backend = Libp2p;
type Tx = Tx; type Tx = Tx;
@ -55,57 +57,54 @@ where
async fn new(network_relay: Relay<Libp2p>) -> Self { async fn new(network_relay: Relay<Libp2p>) -> Self {
let relay = network_relay.clone(); let relay = network_relay.clone();
Self::subscribe(&relay, TOPIC).await; Self::subscribe(&relay, TOPIC).await;
let blocks = tokio::sync::broadcast::Sender::new(BUFFER_SIZE);
let blocks_sender = blocks.clone();
tracing::debug!("Starting up..."); tracing::debug!("Starting up...");
// this wait seems to be helpful in some cases since we give the time // this wait seems to be helpful in some cases since we give the time
// to the network to establish connections before we start sending messages // to the network to establish connections before we start sending messages
tokio::time::sleep(std::time::Duration::from_secs(1)).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// TODO: maybe we need the runtime handle here? Self {
tokio::spawn(async move { network_relay,
_phantom_tx: Default::default(),
_blob_cert: Default::default(),
}
}
async fn blocks_stream(
&self,
) -> Result<BoxedStream<Block<Self::Tx, Self::BlobCertificate>>, DynError> {
let (sender, receiver) = tokio::sync::oneshot::channel(); let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((e, _)) = relay if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Subscribe { .send(NetworkMsg::Subscribe {
kind: EventKind::Message, kind: EventKind::Message,
sender, sender,
}) })
.await .await
{ {
tracing::error!("error subscribing to incoming messages: {e}"); return Err(Box::new(e));
} }
Ok(Box::new(
let mut incoming_messages = receiver.await.unwrap(); BroadcastStream::new(receiver.await.map_err(Box::new)?).filter_map(|message| {
loop { match message {
match incoming_messages.recv().await { Ok(Event::Message(message)) => match wire::deserialize(&message.data) {
Ok(Event::Message(message)) => {
match nomos_core::wire::deserialize(&message.data) {
Ok(msg) => match msg { Ok(msg) => match msg {
NetworkMessage::Block(block) => { NetworkMessage::Block(block) => {
tracing::debug!("received block {:?}", block.header().id()); tracing::debug!("received block {:?}", block.header().id());
if let Err(err) = blocks_sender.send(block) { Some(block)
tracing::error!("error sending block to consensus: {err}");
}
} }
}, },
_ => tracing::debug!("unrecognized gossipsub message"), _ => {
tracing::debug!("unrecognized gossipsub message");
None
}
},
Err(BroadcastStreamRecvError::Lagged(n)) => {
tracing::error!("lagged messages: {n}");
None
} }
} }
Err(RecvError::Lagged(n)) => { }),
tracing::error!("lagged messages: {n}") ))
}
Err(RecvError::Closed) => unreachable!(),
}
}
});
Self {
network_relay,
blocks,
}
}
async fn blocks_stream(&self) -> BoxedStream<Block<Self::Tx, Self::BlobCertificate>> {
Box::new(BroadcastStream::new(self.blocks.subscribe()).filter_map(Result::ok))
} }
async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>) { async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>) {

View File

@ -6,6 +6,7 @@ use std::hash::Hash;
// crates // crates
use futures::Stream; use futures::Stream;
use nomos_core::block::Block; use nomos_core::block::Block;
use overwatch_rs::DynError;
// internal // internal
use crate::network::messages::NetworkMessage; use crate::network::messages::NetworkMessage;
use nomos_network::backends::NetworkBackend; use nomos_network::backends::NetworkBackend;
@ -24,6 +25,8 @@ pub trait NetworkAdapter {
async fn new( async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self; ) -> Self;
async fn blocks_stream(&self) -> BoxedStream<Block<Self::Tx, Self::BlobCertificate>>; async fn blocks_stream(
&self,
) -> Result<BoxedStream<Block<Self::Tx, Self::BlobCertificate>>, DynError>;
async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>); async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>);
} }