diff --git a/nomos-da/network/core/src/replication/handler.rs b/nomos-da/network/core/src/replication/handler.rs index 810893fb..e275ed93 100644 --- a/nomos-da/network/core/src/replication/handler.rs +++ b/nomos-da/network/core/src/replication/handler.rs @@ -8,9 +8,11 @@ use libp2p::{PeerId, Stream, StreamProtocol}; use libp2p::core::upgrade::ReadyUpgrade; use libp2p::swarm::{ConnectionHandler, ConnectionHandlerEvent, SubstreamProtocol}; use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; +use log::trace; use tracing::{debug, error}; use nomos_da_messages::{pack_message, unpack_from_reader}; +use nomos_da_messages::common::Blob; use crate::protocol::PROTOCOL_NAME; @@ -79,13 +81,20 @@ impl ReplicationHandler { &mut self, mut stream: Stream, ) -> impl Future> { - let mut pending_messages = Vec::new(); - std::mem::swap(&mut self.outgoing_messages, &mut pending_messages); + // let mut pending_messages = Vec::new(); + // std::mem::swap(&mut self.outgoing_messages, &mut pending_messages); + // async { + // for message in pending_messages { + // stream.write_all(&pack_message(&message)?).await?; + // } + // stream.flush().await?; + // Ok(stream) + // } + trace!("launched sending pending messages"); async { - for message in pending_messages { - stream.write_all(&pack_message(&message)?).await?; - } - stream.flush().await?; + trace!("Sending pending messages"); + stream.write_all(b"Hello world").await?; + trace!("finished sending messages"); Ok(stream) } } @@ -94,9 +103,26 @@ impl ReplicationHandler { &mut self, mut stream: Stream, ) -> impl Future> { + // async move { + // let msg: DaMessage = unpack_from_reader(&mut stream).await?; + // Ok((msg, stream)) + // } + trace!("launched receiving messages"); + let mut msg = b"Hello world".to_owned(); async move { - let msg: DaMessage = unpack_from_reader(&mut stream).await?; - Ok((msg, stream)) + trace!("reading messages"); + stream.read(msg.as_mut_slice()).await?; + trace!("finished reading messages"); + Ok(( + DaMessage { + blob: Some(Blob { + blob_id: msg.to_vec(), + data: msg.to_vec(), + }), + subnetwork_id: 0, + }, + stream, + )) } } @@ -130,40 +156,37 @@ impl ReplicationHandler { Option, (), HandlerEventToBehaviour>>, Error, > { - loop { - // Propagate incoming messages - match self.outbound.take() { - Some(OutboundState::OpenStream) => { - self.outbound = Some(OutboundState::OpenStream); - break; + // Propagate incoming messages + match self.outbound.take() { + Some(OutboundState::OpenStream) => { + self.outbound = Some(OutboundState::OpenStream); + } + Some(OutboundState::Idle(stream)) => { + if !self.outgoing_messages.is_empty() { + self.outbound = Some(OutboundState::Sending( + self.send_pending_messages(stream).boxed(), + )); + } else { + self.outbound = Some(OutboundState::Idle(stream)); } - Some(OutboundState::Idle(stream)) => { - if !self.outgoing_messages.is_empty() { - self.outbound = Some(OutboundState::Sending( - self.send_pending_messages(stream).boxed(), - )); - } else { - self.outbound = Some(OutboundState::Idle(stream)); - } - break; + } + Some(OutboundState::Sending(mut future)) => match future.poll_unpin(cx) { + Poll::Ready(Ok(stream)) => { + trace!("finished writting messages"); + self.outbound = Some(OutboundState::Idle(stream)); } - Some(OutboundState::Sending(mut future)) => match future.poll_unpin(cx) { - Poll::Ready(Ok(stream)) => { - self.outbound = Some(OutboundState::Idle(stream)); - break; - } - Poll::Ready(Err(e)) => { - error!("{e:?}"); - return Err(e); - } - Poll::Pending => {} - }, - None => { - self.outbound = Some(OutboundState::OpenStream); - return Ok(Some(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: ::listen_protocol(self), - })); + Poll::Ready(Err(e)) => { + error!("{e:?}"); } + Poll::Pending => { + trace!("Keep writting messages"); + } + }, + None => { + self.outbound = Some(OutboundState::OpenStream); + return Ok(Some(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: ::listen_protocol(self), + })); } } Ok(None) diff --git a/nomos-da/network/core/src/replication/mod.rs b/nomos-da/network/core/src/replication/mod.rs index 7c5b0ace..b5a6417e 100644 --- a/nomos-da/network/core/src/replication/mod.rs +++ b/nomos-da/network/core/src/replication/mod.rs @@ -12,6 +12,7 @@ mod test { use libp2p::core::upgrade::Version; use libp2p::identity::Keypair; use libp2p::swarm::SwarmEvent; + use log::info; use tracing::trace; use tracing_subscriber::EnvFilter; use tracing_subscriber::fmt::TestWriter; @@ -119,7 +120,7 @@ mod test { } SwarmEvent::Behaviour(event) => trace!("1 - {event:?}"), event => { - trace!("1 - Swarmevent: {event:?}"); + info!("1 - Swarmevent: {event:?}"); } } } @@ -136,7 +137,7 @@ mod test { } SwarmEvent::Behaviour(event) => trace!("2 - {event:?}"), event => { - trace!("2 - Swarmevent: {event:?}"); + info!("2 - Swarmevent: {event:?}"); } } swarm_2.behaviour_mut().send_message(DaMessage {