Remove unnecessary loop

This commit is contained in:
danielSanchezQ 2024-07-30 08:59:09 +02:00
parent d7b26d2723
commit 8e2481d4d7
2 changed files with 65 additions and 41 deletions

View File

@ -8,9 +8,11 @@ use libp2p::{PeerId, Stream, StreamProtocol};
use libp2p::core::upgrade::ReadyUpgrade; use libp2p::core::upgrade::ReadyUpgrade;
use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol}; use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol};
use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound};
use log::trace;
use tracing::{debug, error}; use tracing::{debug, error};
use nomos_da_messages::{pack_message, unpack_from_reader}; use nomos_da_messages::{pack_message, unpack_from_reader};
use nomos_da_messages::common::Blob;
use crate::protocol::PROTOCOL_NAME; use crate::protocol::PROTOCOL_NAME;
@ -79,13 +81,20 @@ impl ReplicationHandler {
&mut self, &mut self,
mut stream: Stream, mut stream: Stream,
) -> impl Future<Output = Result<Stream, Error>> { ) -> impl Future<Output = Result<Stream, Error>> {
let mut pending_messages = Vec::new(); // let mut pending_messages = Vec::new();
std::mem::swap(&mut self.outgoing_messages, &mut pending_messages); // std::mem::swap(&mut self.outgoing_messages, &mut pending_messages);
// async {
// for message in pending_messages {
// stream.write_all(&pack_message(&message)?).await?;
// }
// stream.flush().await?;
// Ok(stream)
// }
trace!("launched sending pending messages");
async { async {
for message in pending_messages { trace!("Sending pending messages");
stream.write_all(&pack_message(&message)?).await?; stream.write_all(b"Hello world").await?;
} trace!("finished sending messages");
stream.flush().await?;
Ok(stream) Ok(stream)
} }
} }
@ -94,9 +103,26 @@ impl ReplicationHandler {
&mut self, &mut self,
mut stream: Stream, mut stream: Stream,
) -> impl Future<Output = Result<(DaMessage, Stream), Error>> { ) -> impl Future<Output = Result<(DaMessage, Stream), Error>> {
// async move {
// let msg: DaMessage = unpack_from_reader(&mut stream).await?;
// Ok((msg, stream))
// }
trace!("launched receiving messages");
let mut msg = b"Hello world".to_owned();
async move { async move {
let msg: DaMessage = unpack_from_reader(&mut stream).await?; trace!("reading messages");
Ok((msg, stream)) stream.read(msg.as_mut_slice()).await?;
trace!("finished reading messages");
Ok((
DaMessage {
blob: Some(Blob {
blob_id: msg.to_vec(),
data: msg.to_vec(),
}),
subnetwork_id: 0,
},
stream,
))
} }
} }
@ -130,12 +156,10 @@ impl ReplicationHandler {
Option<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), HandlerEventToBehaviour>>, Option<ConnectionHandlerEvent<ReadyUpgrade<StreamProtocol>, (), HandlerEventToBehaviour>>,
Error, Error,
> { > {
loop {
// Propagate incoming messages // Propagate incoming messages
match self.outbound.take() { match self.outbound.take() {
Some(OutboundState::OpenStream) => { Some(OutboundState::OpenStream) => {
self.outbound = Some(OutboundState::OpenStream); self.outbound = Some(OutboundState::OpenStream);
break;
} }
Some(OutboundState::Idle(stream)) => { Some(OutboundState::Idle(stream)) => {
if !self.outgoing_messages.is_empty() { if !self.outgoing_messages.is_empty() {
@ -145,18 +169,18 @@ impl ReplicationHandler {
} else { } else {
self.outbound = Some(OutboundState::Idle(stream)); self.outbound = Some(OutboundState::Idle(stream));
} }
break;
} }
Some(OutboundState::Sending(mut future)) => match future.poll_unpin(cx) { Some(OutboundState::Sending(mut future)) => match future.poll_unpin(cx) {
Poll::Ready(Ok(stream)) => { Poll::Ready(Ok(stream)) => {
trace!("finished writting messages");
self.outbound = Some(OutboundState::Idle(stream)); self.outbound = Some(OutboundState::Idle(stream));
break;
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
error!("{e:?}"); error!("{e:?}");
return Err(e);
} }
Poll::Pending => {} Poll::Pending => {
trace!("Keep writting messages");
}
}, },
None => { None => {
self.outbound = Some(OutboundState::OpenStream); self.outbound = Some(OutboundState::OpenStream);
@ -165,7 +189,6 @@ impl ReplicationHandler {
})); }));
} }
} }
}
Ok(None) Ok(None)
} }
} }

View File

@ -12,6 +12,7 @@ mod test {
use libp2p::core::upgrade::Version; use libp2p::core::upgrade::Version;
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use log::info;
use tracing::trace; use tracing::trace;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt::TestWriter; use tracing_subscriber::fmt::TestWriter;
@ -119,7 +120,7 @@ mod test {
} }
SwarmEvent::Behaviour(event) => trace!("1 - {event:?}"), SwarmEvent::Behaviour(event) => trace!("1 - {event:?}"),
event => { event => {
trace!("1 - Swarmevent: {event:?}"); info!("1 - Swarmevent: {event:?}");
} }
} }
} }
@ -136,7 +137,7 @@ mod test {
} }
SwarmEvent::Behaviour(event) => trace!("2 - {event:?}"), SwarmEvent::Behaviour(event) => trace!("2 - {event:?}"),
event => { event => {
trace!("2 - Swarmevent: {event:?}"); info!("2 - Swarmevent: {event:?}");
} }
} }
swarm_2.behaviour_mut().send_message(DaMessage { swarm_2.behaviour_mut().send_message(DaMessage {