diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 075bde86..91c47c37 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] async-trait = "0.1" bincode = { version = "2.0.0-rc.2", features = ["serde"] } +chrono = "0.4" futures = "0.3" linked-hash-map = { version = "0.5.6", optional = true } nomos-network = { path = "../network", features = ["waku"] } diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 2887da40..58e47f75 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -99,7 +99,7 @@ where P: MemPool + Send + 'static, P::Settings: Clone + Send + Sync + 'static, P::Id: Debug + Send + 'static, - P::Tx: Debug + Send + 'static, + P::Tx: Clone + Debug + Send + Sync + 'static, N: NetworkAdapter + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { @@ -132,9 +132,14 @@ where Some(msg) = service_state.inbound_relay.recv() => { match msg { MempoolMsg::AddTx { tx } => { - pool.add_tx(tx).unwrap_or_else(|e| { - tracing::debug!("could not add tx to the pool due to: {}", e) - }); + match pool.add_tx(tx.clone()) { + Ok(_id) => { + adapter.send_transaction(tx).await; + } + Err(e) => { + tracing::debug!("could not add tx to the pool due to: {}", e); + } + } } MempoolMsg::View { ancestor_hint, tx } => { tx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { diff --git a/nomos-services/mempool/src/network/adapters/mock.rs b/nomos-services/mempool/src/network/adapters/mock.rs index c9549821..6aad88eb 100644 --- a/nomos-services/mempool/src/network/adapters/mock.rs +++ b/nomos-services/mempool/src/network/adapters/mock.rs @@ -3,7 +3,9 @@ use std::marker::PhantomData; // crates use futures::{Stream, StreamExt}; -use nomos_network::backends::mock::{EventKind, Mock, MockBackendMessage, NetworkEvent}; +use nomos_network::backends::mock::{ + EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent, +}; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; @@ -15,6 +17,7 @@ use crate::network::NetworkAdapter; const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic"; const MOCK_CONTENT_TOPIC: &str = "MockContentTopic"; +const MOCK_TX_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("Mock", 1, "Tx"); pub struct MockAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, @@ -24,7 +27,7 @@ pub struct MockAdapter { #[async_trait::async_trait] impl NetworkAdapter for MockAdapter where - Tx: From + DeserializeOwned + Send + Sync + 'static, + Tx: From + Into + DeserializeOwned + Send + Sync + 'static, { type Backend = Mock; type Tx = Tx; @@ -54,10 +57,7 @@ where })) .await { - panic!( - "Couldn't send subscribe message to the network service: {}", - e - ); + panic!("Couldn't send subscribe message to the network service: {e}",); }; Self { network_relay, @@ -95,4 +95,22 @@ where }, ))) } + + async fn send_transaction(&self, tx: Self::Tx) { + if let Err((e, _e)) = self + .network_relay + .send(NetworkMsg::Process(MockBackendMessage::Broadcast { + msg: MockMessage::new( + tx.into(), + MOCK_TX_CONTENT_TOPIC, + 1, + chrono::Utc::now().timestamp() as usize, + ), + topic: MOCK_PUB_SUB_TOPIC, + })) + .await + { + tracing::error!(err = ?e); + }; + } } diff --git a/nomos-services/mempool/src/network/adapters/waku.rs b/nomos-services/mempool/src/network/adapters/waku.rs index 50cbb123..7fb6cfb0 100644 --- a/nomos-services/mempool/src/network/adapters/waku.rs +++ b/nomos-services/mempool/src/network/adapters/waku.rs @@ -1,6 +1,5 @@ -use bincode::config::{Fixint, LittleEndian, NoLimit, WriteFixedArrayLength}; -use std::marker::PhantomData; // std +use std::marker::PhantomData; // crates use futures::{Stream, StreamExt}; use serde::de::DeserializeOwned; @@ -8,11 +7,13 @@ use tokio_stream::wrappers::BroadcastStream; // internal use crate::network::messages::TransactionMsg; use crate::network::NetworkAdapter; +use nomos_core::wire; use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}; use nomos_network::{NetworkMsg, NetworkService}; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic}; +use serde::Serialize; +use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = WakuPubSubTopic::new("CarnotSim", Encoding::Proto); @@ -28,7 +29,7 @@ pub struct WakuAdapter { #[async_trait::async_trait] impl NetworkAdapter for WakuAdapter where - Tx: DeserializeOwned + Send + Sync + 'static, + Tx: DeserializeOwned + Serialize + Send + Sync + 'static, { type Backend = Waku; type Tx = Tx; @@ -69,21 +70,9 @@ where |event| async move { match event { Ok(NetworkEvent::RawMessage(message)) => { - if message.content_topic().content_topic_name - == WAKU_CARNOT_TX_CONTENT_TOPIC.content_topic_name - { - let (tx, _): (TransactionMsg, _) = - // TODO: This should be temporary, we can probably extract this so we can use/try/test a variety of encodings - bincode::serde::decode_from_slice( - message.payload(), - bincode::config::Configuration::< - LittleEndian, - Fixint, - WriteFixedArrayLength, - NoLimit, - >::default(), - ) - .unwrap(); + if message.content_topic() == &WAKU_CARNOT_TX_CONTENT_TOPIC { + let tx: TransactionMsg = + wire::deserializer(message.payload()).deserialize().unwrap(); Some(tx.tx) } else { None @@ -94,4 +83,23 @@ where }, ))) } + + async fn send_transaction(&self, tx: Self::Tx) { + let payload = wire::serialize(&tx).expect("Tx serialization failed"); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Process(WakuBackendMessage::Broadcast { + message: WakuMessage::new( + payload, + WAKU_CARNOT_TX_CONTENT_TOPIC.clone(), + 1, + chrono::Utc::now().timestamp() as usize, + ), + topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()), + })) + .await + { + todo!("log error"); + }; + } } diff --git a/nomos-services/mempool/src/network/mod.rs b/nomos-services/mempool/src/network/mod.rs index 8c56d973..5d8d34d0 100644 --- a/nomos-services/mempool/src/network/mod.rs +++ b/nomos-services/mempool/src/network/mod.rs @@ -19,4 +19,5 @@ pub trait NetworkAdapter { network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self; async fn transactions_stream(&self) -> Box + Unpin + Send>; + async fn send_transaction(&self, tx: Self::Tx); } diff --git a/nomos-services/network/src/backends/mock.rs b/nomos-services/network/src/backends/mock.rs index e4bb2597..4a693383 100644 --- a/nomos-services/network/src/backends/mock.rs +++ b/nomos-services/network/src/backends/mock.rs @@ -140,13 +140,13 @@ impl core::fmt::Debug for MockBackendMessage { match self { Self::BootProducer { .. } => write!(f, "BootProducer"), Self::Broadcast { topic, msg } => { - write!(f, "Broadcast {{ topic: {}, msg: {:?} }}", topic, msg) + write!(f, "Broadcast {{ topic: {topic}, msg: {msg:?} }}") } - Self::RelaySubscribe { topic } => write!(f, "RelaySubscribe {{ topic: {} }}", topic), + Self::RelaySubscribe { topic } => write!(f, "RelaySubscribe {{ topic: {topic} }}"), Self::RelayUnSubscribe { topic } => { - write!(f, "RelayUnSubscribe {{ topic: {} }}", topic) + write!(f, "RelayUnSubscribe {{ topic: {topic} }}") } - Self::Query { topic, .. } => write!(f, "Query {{ topic: {} }}", topic), + Self::Query { topic, .. } => write!(f, "Query {{ topic: {topic} }}"), } } }