use type alias for network adapter (#256)

This commit is contained in:
Giacomo Pasini 2023-07-12 13:33:58 +02:00 committed by GitHub
parent 7a776af530
commit 4745b99996
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 65 deletions

View File

@ -6,12 +6,12 @@ use nomos_network::{
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use tokio_stream::{wrappers::BroadcastStream, Stream};
use tokio_stream::wrappers::BroadcastStream;
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
BoxedStream, NetworkAdapter,
};
use consensus_engine::{BlockId, Committee, View};
@ -57,10 +57,7 @@ impl NetworkAdapter for MockAdapter {
Self { network_relay }
}
async fn proposal_chunks_stream(
&self,
_view: View,
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin> {
async fn proposal_chunks_stream(&self, _view: View) -> BoxedStream<ProposalChunkMsg> {
let stream_channel = self
.message_subscriber_channel()
.await
@ -110,27 +107,15 @@ impl NetworkAdapter for MockAdapter {
todo!()
}
async fn timeout_stream(
&self,
_committee: &Committee,
_view: View,
) -> Box<dyn Stream<Item = TimeoutMsg> + Send + Sync + Unpin> {
async fn timeout_stream(&self, _committee: &Committee, _view: View) -> BoxedStream<TimeoutMsg> {
todo!()
}
async fn timeout_qc_stream(
&self,
_view: View,
) -> Box<dyn Stream<Item = TimeoutQcMsg> + Send + Sync + Unpin> {
async fn timeout_qc_stream(&self, _view: View) -> BoxedStream<TimeoutQcMsg> {
todo!()
}
async fn votes_stream(
&self,
_committee: &Committee,
_view: View,
_proposal_id: BlockId,
) -> Box<dyn Stream<Item = VoteMsg> + Send + Unpin> {
async fn votes_stream(&self, _: &Committee, _: View, _: BlockId) -> BoxedStream<VoteMsg> {
let stream_channel = self
.message_subscriber_channel()
.await
@ -156,11 +141,7 @@ impl NetworkAdapter for MockAdapter {
)))
}
async fn new_view_stream(
&self,
_committee: &Committee,
_view: View,
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin> {
async fn new_view_stream(&self, _: &Committee, _view: View) -> BoxedStream<NewViewMsg> {
todo!()
}

View File

@ -10,7 +10,7 @@ use tokio_stream::wrappers::BroadcastStream;
use crate::network::messages::{NewViewMsg, TimeoutMsg, TimeoutQcMsg};
use crate::network::{
messages::{ProposalChunkMsg, VoteMsg},
NetworkAdapter,
BoxedStream, NetworkAdapter,
};
use consensus_engine::{BlockId, Committee, View};
use nomos_network::{
@ -57,10 +57,7 @@ impl WakuAdapter {
async fn archive_subscriber_stream(
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
content_topic: WakuContentTopic,
) -> Result<
Box<dyn Stream<Item = WakuMessage> + Send + Sync + Unpin>,
tokio::sync::oneshot::error::RecvError,
> {
) -> Result<BoxedStream<WakuMessage>, tokio::sync::oneshot::error::RecvError> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = network_relay
.send(NetworkMsg::Process(WakuBackendMessage::ArchiveSubscribe {
@ -167,10 +164,7 @@ impl NetworkAdapter for WakuAdapter {
Self { network_relay }
}
async fn proposal_chunks_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin> {
async fn proposal_chunks_stream(&self, view: View) -> BoxedStream<ProposalChunkMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(PROPOSAL_CONTENT_TOPIC)
.await
@ -214,11 +208,7 @@ impl NetworkAdapter for WakuAdapter {
.await
}
async fn timeout_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = TimeoutMsg> + Send + Sync + Unpin> {
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg> {
let content_topic = create_topic("timeout", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
@ -237,10 +227,7 @@ impl NetworkAdapter for WakuAdapter {
))
}
async fn timeout_qc_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = TimeoutQcMsg> + Send + Sync + Unpin> {
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg> {
Box::new(Box::pin(
self.cached_stream_with_content_topic(TIMEOUT_QC_CONTENT_TOPIC)
.await
@ -263,7 +250,7 @@ impl NetworkAdapter for WakuAdapter {
committee: &Committee,
view: View,
proposal_id: BlockId,
) -> Box<dyn Stream<Item = VoteMsg> + Send + Unpin> {
) -> BoxedStream<VoteMsg> {
let content_topic = create_topic("votes", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)
@ -282,11 +269,7 @@ impl NetworkAdapter for WakuAdapter {
))
}
async fn new_view_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin> {
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg> {
let content_topic = create_topic("new-view", committee, view);
Box::new(Box::pin(
self.cached_stream_with_content_topic(content_topic)

View File

@ -12,6 +12,8 @@ use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
type BoxedStream<T> = Box<dyn Stream<Item = T> + Send + Sync + Unpin>;
#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + 'static;
@ -24,25 +26,14 @@ pub trait NetworkAdapter {
) -> Box<dyn Stream<Item = ProposalChunkMsg> + Send + Sync + Unpin>;
async fn broadcast_block_chunk(&self, chunk_msg: ProposalChunkMsg);
async fn broadcast_timeout_qc(&self, timeout_qc_msg: TimeoutQcMsg);
async fn timeout_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = TimeoutMsg> + Send + Sync + Unpin>;
async fn timeout_qc_stream(
&self,
view: View,
) -> Box<dyn Stream<Item = TimeoutQcMsg> + Send + Sync + Unpin>;
async fn timeout_stream(&self, committee: &Committee, view: View) -> BoxedStream<TimeoutMsg>;
async fn timeout_qc_stream(&self, view: View) -> BoxedStream<TimeoutQcMsg>;
async fn votes_stream(
&self,
committee: &Committee,
view: View,
proposal_id: BlockId,
) -> Box<dyn Stream<Item = VoteMsg> + Send + Unpin>;
async fn new_view_stream(
&self,
committee: &Committee,
view: View,
) -> Box<dyn Stream<Item = NewViewMsg> + Send + Unpin>;
) -> BoxedStream<VoteMsg>;
async fn new_view_stream(&self, committee: &Committee, view: View) -> BoxedStream<NewViewMsg>;
async fn send(&self, committee: &Committee, view: View, payload: Box<[u8]>, channel: &str);
}