diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index ab03b4ad..70ff5784 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -13,6 +13,7 @@ async-trait = { version = "0.1" } bytes = "1.3" futures = "0.3" raptorq = { version = "1.7", optional = true } +serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" [dev-dependencies] @@ -22,4 +23,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] } [features] default = [] -raptor = ["raptorq"] \ No newline at end of file +raptor = ["raptorq"] diff --git a/nomos-services/consensus/src/network/adapters/waku.rs b/nomos-services/consensus/src/network/adapters/waku.rs index 9db732a8..1fb235db 100644 --- a/nomos-services/consensus/src/network/adapters/waku.rs +++ b/nomos-services/consensus/src/network/adapters/waku.rs @@ -72,25 +72,23 @@ impl NetworkAdapter for WakuAdapter { Box::new( BroadcastStream::new(stream_channel).filter_map(|msg| async move { match msg { - Ok(event) => match event { - NetworkEvent::RawMessage(message) => { - // TODO: this should actually check the whole content topic, - // waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) - if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name - == message.content_topic().content_topic_name - { - let payload = message.payload(); - Some( - ProposalChunkMsg::from_bytes::( - payload.try_into().unwrap(), - ) - .chunk, + Ok(NetworkEvent::RawMessage(message)) => { + // TODO: this should actually check the whole content topic, + // waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28) + if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name + == message.content_topic().content_topic_name + { + let payload = message.payload(); + Some( + ProposalChunkMsg::from_bytes::( + payload.try_into().unwrap(), ) - } else { - None - } + .chunk, + ) + } else { + None } - }, + } Err(_e) => None, } }), diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index eb303cb2..0b8c9e8d 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -103,7 +103,8 @@ impl ServiceCore for Logger { async fn run(self) -> Result<(), overwatch_rs::DynError> { // keep the handle alive without stressing the runtime - Ok(futures::pending!()) + futures::pending!(); + Ok(()) } } diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index c9232138..8966508c 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -7,9 +7,18 @@ edition = "2021" [dependencies] async-trait = "0.1" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +bincode = { version = "2.0.0-rc.2", features = ["serde"] } +futures = "0.3" nomos-network = { path = "../network", features = ["waku"] } nomos-core = { path = "../../nomos-core" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +serde = { version = "1.0", features = ["derive"] } tracing = "0.1" tokio = { version = "1", features = ["sync"] } -futures = "0.3" \ No newline at end of file +tokio-stream = "0.1" +waku-bindings = { version = "0.1.0-beta1", optional = true} + + +[features] +default = [] +waku = ["nomos-network/waku", "waku-bindings"] \ No newline at end of file diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 4d84fe62..54b28feb 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -1,14 +1,15 @@ use nomos_core::block::{BlockHeader, BlockId}; -pub trait Pool { +pub trait MemPool { + type Settings; type Tx; type Id; /// Construct a new empty pool - fn new() -> Self; + fn new(settings: Self::Settings) -> Self; /// Add a new transaction to the mempool, for example because we received it from the network - fn add_tx(&mut self, tx: Self::Tx, id: Self::Id) -> Result<(), overwatch_rs::DynError>; + fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError>; /// Return a view over the transactions contained in the mempool. /// Implementations should provide *at least* all the transactions which have not been marked as diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 5d2ffddd..add1662a 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -1,18 +1,18 @@ -/// std +pub mod backend; +pub mod network; + +// std use std::fmt::{Debug, Error, Formatter}; -/// crates -use tokio::sync::broadcast::Receiver; +// crates +use futures::StreamExt; use tokio::sync::oneshot::Sender; -/// internal -pub mod backend; -use backend::Pool; +// internal +use crate::network::NetworkAdapter; +use backend::MemPool; use nomos_core::block::{BlockHeader, BlockId}; -use nomos_network::{ - backends::{waku::NetworkEvent, NetworkBackend}, - NetworkService, -}; +use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::{OutboundRelay, Relay, RelayMessage}, @@ -20,19 +20,21 @@ use overwatch_rs::services::{ ServiceCore, ServiceData, ServiceId, }; -pub struct Mempool -where +pub struct MempoolService< + N: NetworkAdapter + Send + Sync + 'static, + P: MemPool + Send + Sync + 'static, +> where + P::Settings: Clone + Send + Sync + 'static, P::Tx: Debug + Send + Sync + 'static, P::Id: Debug + Send + Sync + 'static, { service_state: ServiceStateHandle, - network_relay: Relay>, + network_relay: Relay>, pool: P, } pub enum MempoolMsg { AddTx { - id: Id, tx: Tx, }, View { @@ -58,7 +60,7 @@ impl Debug for MempoolMsg { ancestor_hint ) } - Self::AddTx { id, tx } => write!(f, "MempoolMsg::AddTx{{id: {:?}, tx: {:?}}}", id, tx), + Self::AddTx { tx } => write!(f, "MempoolMsg::AddTx{{tx: {:?}}}", tx), Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {:?}}}", ids), Self::MarkInBlock { ids, block } => { write!( @@ -73,43 +75,45 @@ impl Debug for MempoolMsg { impl RelayMessage for MempoolMsg {} -impl ServiceData for Mempool +impl ServiceData for MempoolService where - N: NetworkBackend + Send + Sync + 'static, - P: Pool + Send + Sync + 'static, + N: NetworkAdapter + Send + Sync + 'static, + P: MemPool + Send + Sync + 'static, + P::Settings: Clone + Send + Sync + 'static, P::Id: Debug + Send + Sync + 'static, P::Tx: Debug + Send + Sync + 'static, { const SERVICE_ID: ServiceId = "Mempool"; - type Settings = (); + type Settings = P::Settings; type State = NoState; type StateOperator = NoOperator; - type Message = MempoolMsg<

::Tx,

::Id>; + type Message = MempoolMsg<

::Tx,

::Id>; } #[async_trait::async_trait] -impl ServiceCore for Mempool +impl ServiceCore for MempoolService where - P: Pool + Send + Sync + 'static, + P: MemPool + Send + Sync + 'static, + P::Settings: Clone + Send + Sync + 'static, P::Id: Debug + Send + Sync + 'static, P::Tx: Debug + Send + Sync + 'static, - N: NetworkBackend + Send + Sync + 'static, + N: NetworkAdapter + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); + let pool_settings = service_state.settings_reader.get_updated_settings(); Ok(Self { service_state, network_relay, - pool: P::new(), + pool: P::new(pool_settings), }) } - #[allow(unreachable_code, unused, clippy::diverging_sub_expression)] async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { - service_state, + mut service_state, network_relay, - pool, + mut pool, } = self; let network_relay: OutboundRelay<_> = network_relay @@ -117,32 +121,33 @@ where .await .expect("Relay connection with NetworkService should succeed"); - // Separate function so that we can specialize it for different network backends - let network_txs: Receiver = todo!(); + let adapter = N::new(network_relay).await; + let mut network_txs = adapter.transactions_stream().await; loop { tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { match msg { - MempoolMsg::AddTx { id, tx } => { - pool.add_tx(tx, id).unwrap_or_else(|e| { + MempoolMsg::AddTx { tx } => { + pool.add_tx(tx).unwrap_or_else(|e| { tracing::debug!("could not add tx to the pool due to: {}", e) }); } MempoolMsg::View { ancestor_hint, rx } => { rx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { tracing::debug!("could not send back pool view") - }) + }); } MempoolMsg::MarkInBlock { ids, block } => { pool.mark_in_block(ids, block); } - MempoolMsg::Prune { ids } => pool.prune(ids), + MempoolMsg::Prune { ids } => { pool.prune(ids); }, } } - Ok(msg) = network_txs.recv() => { - // filter incoming transactions and add them to the pool - todo!() + Some(tx) = network_txs.next() => { + pool.add_tx(tx).unwrap_or_else(|e| { + tracing::debug!("could not add tx to the pool due to: {}", e) + }); } } } diff --git a/nomos-services/mempool/src/network/adapters/mod.rs b/nomos-services/mempool/src/network/adapters/mod.rs new file mode 100644 index 00000000..ac25906e --- /dev/null +++ b/nomos-services/mempool/src/network/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "waku")] +pub mod waku; diff --git a/nomos-services/mempool/src/network/adapters/waku.rs b/nomos-services/mempool/src/network/adapters/waku.rs new file mode 100644 index 00000000..bf40bd8b --- /dev/null +++ b/nomos-services/mempool/src/network/adapters/waku.rs @@ -0,0 +1,100 @@ +use bincode::config::{Fixint, LittleEndian, NoLimit, WriteFixedArrayLength}; +use std::marker::PhantomData; +// std +// crates +use futures::{Stream, StreamExt}; +use serde::de::DeserializeOwned; +use tokio_stream::wrappers::BroadcastStream; +// internal +use crate::network::messages::TransactionMsg; +use crate::network::NetworkAdapter; +use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}; +use nomos_network::{NetworkMsg, NetworkService}; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic}; + +static WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = + WakuPubSubTopic::new("CarnotSim", Encoding::Proto); + +static WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic = + WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto); + +pub struct WakuAdapter { + network_relay: OutboundRelay< as ServiceData>::Message>, + _tx: PhantomData, +} + +#[async_trait::async_trait] +impl NetworkAdapter for WakuAdapter +where + Tx: DeserializeOwned + Send + Sync + 'static, +{ + type Backend = Waku; + type Tx = Tx; + + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + // Subscribe to the carnot pubsub topic + if let Err((e, _)) = network_relay + .send(NetworkMsg::Process(WakuBackendMessage::RelaySubscribe { + topic: WAKU_CARNOT_PUB_SUB_TOPIC.clone(), + })) + .await + { + // We panic, but as we could try to reconnect later it should not be + // a problem. But definitely something to consider. + panic!( + "Couldn't send subscribe message to the network service: {}", + e + ); + }; + Self { + network_relay, + _tx: Default::default(), + } + } + async fn transactions_stream(&self) -> Box + Unpin + Send> { + let (sender, receiver) = tokio::sync::oneshot::channel(); + if let Err((_, _e)) = self + .network_relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + { + todo!("log error"); + }; + let receiver = receiver.await.unwrap(); + Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( + |event| async move { + match event { + Ok(NetworkEvent::RawMessage(message)) => { + if message.content_topic().content_topic_name + == WAKU_CARNOT_TX_CONTENT_TOPIC.content_topic_name + { + let (tx, _): (TransactionMsg, _) = + // TODO: This should be temporary, we can probably extract this so we can use/try/test a variety of encodings + bincode::serde::decode_from_slice( + message.payload(), + bincode::config::Configuration::< + LittleEndian, + Fixint, + WriteFixedArrayLength, + NoLimit, + >::default(), + ) + .unwrap(); + Some(tx.tx) + } else { + None + } + } + Err(_e) => None, + } + }, + ))) + } +} diff --git a/nomos-services/mempool/src/network/messages.rs b/nomos-services/mempool/src/network/messages.rs new file mode 100644 index 00000000..0dbdb9b6 --- /dev/null +++ b/nomos-services/mempool/src/network/messages.rs @@ -0,0 +1,9 @@ +// std +// crates +use serde::{Deserialize, Serialize}; +// internal + +#[derive(Serialize, Deserialize)] +pub struct TransactionMsg { + pub tx: Tx, +} diff --git a/nomos-services/mempool/src/network/mod.rs b/nomos-services/mempool/src/network/mod.rs new file mode 100644 index 00000000..bcb190d8 --- /dev/null +++ b/nomos-services/mempool/src/network/mod.rs @@ -0,0 +1,22 @@ +pub mod adapters; +mod messages; + +// std + +// crates +use futures::Stream; +// internal +use nomos_network::backends::NetworkBackend; +use nomos_network::NetworkService; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; + +#[async_trait::async_trait] +pub trait NetworkAdapter { + type Backend: NetworkBackend + Send + Sync + 'static; + type Tx: Send + Sync + 'static; + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self; + async fn transactions_stream(&self) -> Box + Unpin + Send>; +}