From 818d7f29cdb8bd50f2381d08ab1c8c7abb876e79 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 15 Mar 2023 00:55:08 +0800 Subject: [PATCH] Add generic block (#93) * add generic block * fix PR comments * Block uses tx hashes * Refactor bounds and generics to accept block type * remove Tx generics * add generic for block * Remove unnecessary bounds on leadership * Impl from with ownership for mock tx and txid * feature gate --------- Co-authored-by: danielsanchezq --- .DS_Store | Bin 0 -> 6148 bytes nomos-core/Cargo.toml | 1 + nomos-core/src/block.rs | 37 +++++--- nomos-core/src/tx/carnot.rs | 10 +++ nomos-core/src/tx/mock.rs | 70 +++++++++++++-- nomos-core/src/tx/mod.rs | 9 +- nomos-services/consensus/Cargo.toml | 2 +- nomos-services/consensus/src/leadership.rs | 22 +++-- nomos-services/consensus/src/lib.rs | 66 +++++++++----- .../consensus/src/network/adapters/mock.rs | 4 +- .../consensus/src/overlay/committees.rs | 48 ++++++---- nomos-services/consensus/src/overlay/flat.rs | 34 ++++--- nomos-services/consensus/src/overlay/mod.rs | 8 +- nomos-services/mempool/Cargo.toml | 2 +- .../mempool/src/backend/mockpool.rs | 10 +++ nomos-services/mempool/src/backend/mod.rs | 7 ++ nomos-services/mempool/src/lib.rs | 15 ++++ .../mempool/src/network/adapters/mock.rs | 12 +-- nomos-services/mempool/tests/mock.rs | 17 ++-- nomos-services/network/src/backends/mock.rs | 85 ++++++++++-------- 20 files changed, 313 insertions(+), 146 deletions(-) create mode 100644 .DS_Store create mode 100644 nomos-core/src/tx/carnot.rs diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..f1112a46af5356070b3e3426e2915aae5b41750c GIT binary patch literal 6148 zcmeHKyG{c^3>-s>NJuCp<^BK#f3S+e7w`j!gaRpW5+_n$#dq;(j2{KjK^4(NW63+a zUXM>V#rX`t)*rh&U6nG}!$Qa}nw0V!~@0@YHNS0`6e0VyB_&P@UTJ~X;xFPswN)4?T1 z0OEq-Fz#cPAT|#Wd*PJG2+fj8Osdt0VM%AaRbDTg5|a+A;lt|5RuhWF(^$=WSb@ttuDt)>)Bl+Nk4ZX70V!}+3fN-vuvzn!s<+Nw&U { + header: BlockHeader, + transactions: IndexSet, +} /// A block header -#[derive(Clone, Debug)] -pub struct BlockHeader; +#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)] +pub struct BlockHeader { + id: BlockId, +} /// Identifier of a block pub type BlockId = [u8; 32]; -impl Block { +impl Block { + pub fn new(header: BlockHeader, txs: impl Iterator) -> Self { + Self { + header, + transactions: txs.collect(), + } + } + /// Encode block into bytes pub fn as_bytes(&self) -> Bytes { Bytes::new() } - pub fn from_bytes(_: Bytes) -> Self { - Self + pub fn header(&self) -> BlockHeader { + self.header } - pub fn header(&self) -> BlockHeader { - BlockHeader + pub fn transactions(&self) -> impl Iterator + '_ { + self.transactions.iter() } } impl BlockHeader { pub fn id(&self) -> BlockId { - todo!() + self.id } } diff --git a/nomos-core/src/tx/carnot.rs b/nomos-core/src/tx/carnot.rs new file mode 100644 index 00000000..2523e87a --- /dev/null +++ b/nomos-core/src/tx/carnot.rs @@ -0,0 +1,10 @@ +// std +// crates +use serde::{Deserialize, Serialize}; +// internal +pub use crate::tx::transaction::Transaction; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Tx { + Transfer(Transaction), +} diff --git a/nomos-core/src/tx/mock.rs b/nomos-core/src/tx/mock.rs index f597e786..16ce87fa 100644 --- a/nomos-core/src/tx/mock.rs +++ b/nomos-core/src/tx/mock.rs @@ -1,23 +1,77 @@ +use crate::wire::serialize; use blake2::{ digest::{Update, VariableOutput}, Blake2bVar, }; +use nomos_network::backends::mock::MockMessage; -#[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize)] -pub enum MockTransactionMsg { - Request(nomos_network::backends::mock::MockMessage), - Response(nomos_network::backends::mock::MockMessage), +#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +pub struct MockTransaction { + id: MockTxId, + content: MockMessage, } -#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)] +impl MockTransaction { + pub fn new(content: MockMessage) -> Self { + let id = MockTxId::from(content.clone()); + Self { id, content } + } + + pub fn message(&self) -> &MockMessage { + &self.content + } +} + +impl From for MockTransaction { + fn from(msg: nomos_network::backends::mock::MockMessage) -> Self { + let id = MockTxId::from(msg.clone()); + Self { id, content: msg } + } +} + +#[derive( + Debug, Eq, Hash, PartialEq, Ord, Copy, Clone, PartialOrd, serde::Serialize, serde::Deserialize, +)] pub struct MockTxId([u8; 32]); -impl From<&MockTransactionMsg> for MockTxId { - fn from(tx: &MockTransactionMsg) -> Self { +impl From<[u8; 32]> for MockTxId { + fn from(tx_id: [u8; 32]) -> Self { + Self(tx_id) + } +} + +impl core::ops::Deref for MockTxId { + type Target = [u8; 32]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsRef<[u8]> for MockTxId { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl MockTxId { + pub fn new(tx_id: [u8; 32]) -> MockTxId { + MockTxId(tx_id) + } +} + +impl From for MockTxId { + fn from(msg: nomos_network::backends::mock::MockMessage) -> Self { let mut hasher = Blake2bVar::new(32).unwrap(); - hasher.update(serde_json::to_string(tx).unwrap().as_bytes()); + hasher.update(&serialize(&msg).unwrap()); let mut id = [0u8; 32]; hasher.finalize_variable(&mut id).unwrap(); Self(id) } } + +impl From<&MockTransaction> for MockTxId { + fn from(msg: &MockTransaction) -> Self { + msg.id + } +} diff --git a/nomos-core/src/tx/mod.rs b/nomos-core/src/tx/mod.rs index 3d71e39e..fbfb9496 100644 --- a/nomos-core/src/tx/mod.rs +++ b/nomos-core/src/tx/mod.rs @@ -1,11 +1,4 @@ +pub mod carnot; #[cfg(feature = "mock")] pub mod mock; mod transaction; - -use serde::{Deserialize, Serialize}; -pub use transaction::Transaction; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Tx { - Transfer(Transaction), -} diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index f1c1d5ab..01572677 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -16,7 +16,7 @@ nomos-core = { path = "../../nomos-core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } rand_chacha = "0.3" rand = "0.8" -serde = "1.0" +serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" waku-bindings = { version = "0.1.0-rc.2", optional = true} diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs index 799991ac..27af0e5a 100644 --- a/nomos-services/consensus/src/leadership.rs +++ b/nomos-services/consensus/src/leadership.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; // crates // internal -use nomos_core::crypto::PrivateKey; +use nomos_core::{block::BlockHeader, crypto::PrivateKey}; use nomos_mempool::MempoolMsg; use super::*; @@ -17,9 +17,9 @@ pub struct Leadership { mempool: OutboundRelay>, } -pub enum LeadershipResult<'view> { +pub enum LeadershipResult<'view, TxId: Eq + core::hash::Hash> { Leader { - block: Block, + block: Block, _view: PhantomData<&'view u8>, }, NotLeader { @@ -27,7 +27,11 @@ pub enum LeadershipResult<'view> { }, } -impl Leadership { +impl Leadership +where + Id: Eq + core::hash::Hash, + for<'t> &'t Tx: Into, // TODO: we should probably abstract this away but for now the constrain may do +{ pub fn new(key: PrivateKey, mempool: OutboundRelay>) -> Self { Self { key: Enclave { key }, @@ -41,19 +45,21 @@ impl Leadership { view: &'view View, tip: &Tip, qc: Qc, - ) -> LeadershipResult<'view> { - let ancestor_hint = todo!("get the ancestor from the tip"); + ) -> LeadershipResult<'view, Id> { + // TODO: get the correct ancestor for the tip + // let ancestor_hint = todo!("get the ancestor from the tip"); + let ancestor_hint = [0; 32]; if view.is_leader(self.key.key) { let (tx, rx) = tokio::sync::oneshot::channel(); self.mempool.send(MempoolMsg::View { ancestor_hint, reply_channel: tx, }); - let _iter = rx.await; + let iter = rx.await.unwrap(); LeadershipResult::Leader { _view: PhantomData, - block: todo!("form a block from the returned iterator"), + block: Block::new(BlockHeader::default(), iter.map(|ref tx| tx.into())), } } else { LeadershipResult::NotLeader { _view: PhantomData } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 7b03f042..a9a88cf3 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -7,20 +7,18 @@ mod leadership; mod network; pub mod overlay; -#[cfg(test)] -mod test; mod tip; // std use std::collections::BTreeMap; -use std::error::Error; use std::fmt::Debug; +use std::hash::Hash; // crates use serde::{Deserialize, Serialize}; // internal use crate::network::NetworkAdapter; use leadership::{Leadership, LeadershipResult}; -use nomos_core::block::Block; +use nomos_core::block::{Block, TxHash}; use nomos_core::crypto::PublicKey; use nomos_core::fountain::FountainCode; use nomos_core::staking::Stake; @@ -42,6 +40,7 @@ pub type NodeId = PublicKey; // Random seed for each round provided by the protocol pub type Seed = [u8; 32]; +#[derive(Debug)] pub struct CarnotSettings { private_key: [u8; 32], fountain_settings: Fountain::Settings, @@ -123,10 +122,19 @@ where T::Settings: Send + Sync + 'static, T::Outcome: Send + Sync, P::Settings: Send + Sync + 'static, - P::Tx: Debug + Send + Sync + 'static, - P::Id: Debug + Send + Sync + 'static, + P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static, + for<'t> &'t P::Tx: Into, + P::Id: Debug + + Clone + + serde::de::DeserializeOwned + + for<'a> From<&'a P::Tx> + + Eq + + Hash + + Send + + Sync + + 'static, M: MempoolAdapter + Send + Sync + 'static, - O: Overlay + Send + Sync + 'static, + O: Overlay + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -167,7 +175,7 @@ where let fountain = F::new(fountain_settings); let tally = T::new(tally_settings); - let leadership = Leadership::new(private_key, mempool_relay); + let leadership = Leadership::::new(private_key, mempool_relay.clone()); // FIXME: this should be taken from config let mut cur_view = View { seed: [0; 32], @@ -180,7 +188,7 @@ where // FIXME: this should probably have a timer to detect failed rounds let res = cur_view - .resolve::( + .resolve::( private_key, &tip, &network_adapter, @@ -190,10 +198,22 @@ where ) .await; match res { - Ok((_block, view)) => { + Ok((block, view)) => { // resolved block, mark as verified and possibly update the tip // not sure what mark as verified means, e.g. if we want an event subscription // system for this to be used for example by the ledger, storage and mempool + + mempool_relay + .send(nomos_mempool::MempoolMsg::MarkInBlock { + ids: block.transactions().cloned().collect(), + block: block.header(), + }) + .await + .map_err(|(e, _)| { + tracing::error!("Error while sending MarkInBlock message: {}", e); + e + })?; + cur_view = view; } Err(e) => { @@ -217,27 +237,26 @@ pub struct View { impl View { // TODO: might want to encode steps in the type system - pub async fn resolve<'view, A, O, F, T, Tx, Id>( + pub async fn resolve<'view, A, O, F, T, Tx>( &'view self, node_id: NodeId, tip: &Tip, adapter: &A, fountain: &F, tally: &T, - leadership: &Leadership, - ) -> Result<(Block, View), Box> + leadership: &Leadership, + ) -> Result<(Block, View), Box> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, + for<'t> &'t Tx: Into, T: Tally + Send + Sync + 'static, T::Outcome: Send + Sync, O: Overlay, { let res = if self.is_leader(node_id) { let block = self - .resolve_leader::( - node_id, tip, adapter, fountain, tally, leadership, - ) + .resolve_leader::(node_id, tip, adapter, fountain, tally, leadership) .await .unwrap(); // FIXME: handle sad path let next_view = self.generate_next_view(&block); @@ -257,20 +276,21 @@ impl View { Ok(res) } - async fn resolve_leader<'view, A, O, F, T, Tx, Id>( + async fn resolve_leader<'view, A, O, F, T, Tx>( &'view self, node_id: NodeId, tip: &Tip, adapter: &A, fountain: &F, tally: &T, - leadership: &Leadership, - ) -> Result + leadership: &Leadership, + ) -> Result, ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, T: Tally + Send + Sync + 'static, T::Outcome: Send + Sync, + for<'t> &'t Tx: Into, O: Overlay, { let overlay = O::new(self, node_id); @@ -295,7 +315,7 @@ impl View { adapter: &A, fountain: &F, tally: &T, - ) -> Result<(Block, View), ()> + ) -> Result<(Block, View), ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, @@ -332,7 +352,7 @@ impl View { } pub fn is_leader(&self, _node_id: NodeId) -> bool { - false + true } pub fn id(&self) -> u64 { @@ -340,12 +360,12 @@ impl View { } // Verifies the block is new and the previous leader did not fail - fn pipelined_safe_block(&self, _: &Block) -> bool { + fn pipelined_safe_block(&self, _: &Block) -> bool { // return b.view_n >= self.view_n && b.view_n == b.qc.view_n true } - fn generate_next_view(&self, _b: &Block) -> View { + fn generate_next_view(&self, _b: &Block) -> View { let mut seed = self.seed; seed[0] += 1; View { diff --git a/nomos-services/consensus/src/network/adapters/mock.rs b/nomos-services/consensus/src/network/adapters/mock.rs index 3f33692e..ce2bf810 100644 --- a/nomos-services/consensus/src/network/adapters/mock.rs +++ b/nomos-services/consensus/src/network/adapters/mock.rs @@ -108,7 +108,7 @@ impl NetworkAdapter for MockAdapter { .network_relay .send(NetworkMsg::Process(MockBackendMessage::Broadcast { msg: message, - topic: MOCK_PUB_SUB_TOPIC, + topic: MOCK_PUB_SUB_TOPIC.to_string(), })) .await { @@ -164,7 +164,7 @@ impl NetworkAdapter for MockAdapter { .network_relay .send(NetworkMsg::Process(MockBackendMessage::Broadcast { msg: message, - topic: MOCK_PUB_SUB_TOPIC, + topic: MOCK_PUB_SUB_TOPIC.to_string(), })) .await { diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 7aaf31b0..05b1270b 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -1,6 +1,8 @@ // std +use std::hash::Hash; // crates use futures::StreamExt; +use nomos_core::wire::deserializer; use rand::{seq::SliceRandom, SeedableRng}; // internal use super::*; @@ -8,38 +10,44 @@ use crate::network::messages::ProposalChunkMsg; use crate::network::NetworkAdapter; /// View of the tree overlay centered around a specific member -pub struct Member { +pub struct Member { // id is not used now, but gonna probably used it for later checking later on #[allow(dead_code)] id: NodeId, committee: Committee, - committees: Committees, + committees: Committees, view_n: u64, + _marker: std::marker::PhantomData, } /// #Just a newtype index to be able to implement parent/children methods #[derive(Copy, Clone)] pub struct Committee(usize); -pub struct Committees { +pub struct Committees { nodes: Box<[NodeId]>, + _marker: std::marker::PhantomData, } -impl Committees { +impl Committees { pub fn new(view: &View) -> Self { let mut nodes = view.staking_keys.keys().cloned().collect::>(); let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed); nodes.shuffle(&mut rng); - Self { nodes } + Self { + nodes, + _marker: std::marker::PhantomData, + } } - pub fn into_member(self, id: NodeId, view: &View) -> Option> { + pub fn into_member(self, id: NodeId, view: &View) -> Option> { let member_idx = self.nodes.iter().position(|m| m == &id)?; Some(Member { committee: Committee(member_idx / C), committees: self, id, view_n: view.view_n, + _marker: std::marker::PhantomData, }) } @@ -84,7 +92,7 @@ impl Committee { } } -impl Member { +impl Member { /// Return other members of this committee pub fn peers(&self) -> &[NodeId] { self.committees @@ -109,13 +117,15 @@ impl Member { } #[async_trait::async_trait] -impl< - Network: NetworkAdapter + Sync, - Fountain: FountainCode + Sync, - VoteTally: Tally + Sync, - const C: usize, - > Overlay for Member +impl Overlay + for Member +where + Network: NetworkAdapter + Sync, + Fountain: FountainCode + Sync, + VoteTally: Tally + Sync, + TxId: serde::de::DeserializeOwned + Eq + Hash + Clone + Send + Sync + 'static, { + type TxId = TxId; // we still need view here to help us initialize fn new(view: &View, node: NodeId) -> Self { let committees = Committees::new(view); @@ -127,17 +137,21 @@ impl< view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result { + ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let committee = self.committee; let message_stream = adapter.proposal_chunks_stream(committee, view).await; - fountain.decode(message_stream).await.map(Block::from_bytes) + fountain.decode(message_stream).await.and_then(|b| { + deserializer(&b) + .deserialize::>() + .map_err(|e| FountainError::from(e.to_string().as_str())) + }) } async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ) { @@ -166,7 +180,7 @@ impl< async fn approve_and_forward( &self, view: &View, - _block: &Block, + _block: &Block, _adapter: &Network, _tally: &VoteTally, _next_view: &View, diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index b368fd30..e69efae5 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -1,5 +1,6 @@ // std use std::error::Error; +use std::hash::Hash; // crates use futures::StreamExt; use serde::de::DeserializeOwned; @@ -9,6 +10,7 @@ use super::*; use crate::network::messages::{ProposalChunkMsg, VoteMsg}; use crate::network::NetworkAdapter; use crate::overlay::committees::Committee; +use nomos_core::wire::deserializer; const FLAT_COMMITTEE: Committee = Committee::root(); @@ -16,31 +18,39 @@ const FLAT_COMMITTEE: Committee = Committee::root(); /// As far as the API is concerned, this should be equivalent to any other /// overlay and far simpler to implement. /// For this reason, this might act as a 'reference' overlay for testing. -pub struct Flat { +pub struct Flat { // TODO: this should be a const param, but we can't do that yet node_id: NodeId, view_n: u64, + _marker: std::marker::PhantomData, } -impl Flat { +impl Flat { pub fn new(view_n: u64, node_id: NodeId) -> Self { - Self { node_id, view_n } + Self { + node_id, + view_n, + _marker: Default::default(), + } } - fn approve(&self, _block: &Block) -> Approval { + fn approve(&self, _block: &Block) -> Approval { // we still need to define how votes look like - todo!() + Approval } } #[async_trait::async_trait] -impl Overlay for Flat +impl Overlay for Flat where + TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, VoteTally: Tally + Sync, VoteTally::Vote: Serialize + DeserializeOwned + Send, { + type TxId = TxId; + fn new(view: &View, node: NodeId) -> Self { Flat::new(view.view_n, node) } @@ -50,16 +60,20 @@ where view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result { + ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await; - fountain.decode(message_stream).await.map(Block::from_bytes) + fountain.decode(message_stream).await.and_then(|b| { + deserializer(&b) + .deserialize::>() + .map_err(|e| FountainError::from(e.to_string().as_str())) + }) } async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ) { @@ -79,7 +93,7 @@ where async fn approve_and_forward( &self, view: &View, - block: &Block, + block: &Block, adapter: &Network, _tally: &VoteTally, _next_view: &View, diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index 277efaa9..46c62f61 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -15,6 +15,8 @@ use nomos_core::vote::Tally; /// Dissemination overlay, tied to a specific view #[async_trait::async_trait] pub trait Overlay { + type TxId: serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash + Send + Sync + 'static; + fn new(view: &View, node: NodeId) -> Self; async fn reconstruct_proposal_block( @@ -22,11 +24,11 @@ pub trait Overlay Result; + ) -> Result, FountainError>; async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ); @@ -36,7 +38,7 @@ pub trait Overlay, adapter: &Network, vote_tally: &VoteTally, next_view: &View, diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 89b16ff7..27b56884 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -18,7 +18,7 @@ rand = { version = "0.8", optional = true } serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" tracing = "0.1" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "macros"] } tokio-stream = "0.1" waku-bindings = { version = "0.1.0-rc.2", optional = true} diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index 0ea679da..dbb6cbb6 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -86,6 +86,16 @@ where block_entry.append(&mut txs_in_block); } + #[cfg(test)] + fn block_transactions( + &self, + block: BlockId, + ) -> Option + Send>> { + self.in_block_txs.get(&block).map(|txs| { + Box::new(txs.clone().into_iter()) as Box + Send> + }) + } + fn prune(&mut self, txs: &[Self::Id]) { for tx_id in txs { self.pending_txs.remove(tx_id); diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 6ec1a06c..90258912 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -32,6 +32,13 @@ pub trait MemPool { /// Record that a set of transactions were included in a block fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader); + /// Returns all of the transactions for the block + #[cfg(test)] + fn block_transactions( + &self, + block: BlockId, + ) -> Option + Send>>; + /// Signal that a set of transactions can't be possibly requested anymore and can be /// discarded. fn prune(&mut self, txs: &[Self::Id]); diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 359dbeed..6867cc64 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -49,6 +49,11 @@ pub enum MempoolMsg { Prune { ids: Vec, }, + #[cfg(test)] + BlockTransaction { + block: BlockId, + reply_channel: Sender + Send>>>, + }, MarkInBlock { ids: Vec, block: BlockHeader, @@ -72,6 +77,10 @@ impl Debug for MempoolMsg { "MempoolMsg::MarkInBlock{{ids: {ids:?}, block: {block:?}}}" ) } + #[cfg(test)] + Self::BlockTransaction { block, .. } => { + write!(f, "MempoolMsg::BlockTransaction{{block: {block:?}}}") + } Self::Metrics { .. } => write!(f, "MempoolMsg::Metrics"), } } @@ -152,6 +161,12 @@ where MempoolMsg::MarkInBlock { ids, block } => { pool.mark_in_block(&ids, block); } + #[cfg(test)] + MempoolMsg::BlockTransaction { block, reply_channel } => { + reply_channel.send(pool.block_transactions(block)).unwrap_or_else(|_| { + tracing::debug!("could not send back block transactions") + }); + } MempoolMsg::Prune { ids } => { pool.prune(&ids); }, MempoolMsg::Metrics { reply_channel } => { let metrics = MempoolMetrics { diff --git a/nomos-services/mempool/src/network/adapters/mock.rs b/nomos-services/mempool/src/network/adapters/mock.rs index 55ce6f4d..e107bc5f 100644 --- a/nomos-services/mempool/src/network/adapters/mock.rs +++ b/nomos-services/mempool/src/network/adapters/mock.rs @@ -2,7 +2,7 @@ // crates use futures::{Stream, StreamExt}; -use nomos_core::tx::mock::MockTransactionMsg; +use nomos_core::tx::mock::MockTransaction; use nomos_network::backends::mock::{ EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent, }; @@ -25,7 +25,7 @@ pub struct MockAdapter { #[async_trait::async_trait] impl NetworkAdapter for MockAdapter { type Backend = Mock; - type Tx = MockTransactionMsg; + type Tx = MockTransaction; async fn new( network_relay: OutboundRelay< as ServiceData>::Message>, @@ -48,7 +48,7 @@ impl NetworkAdapter for MockAdapter { if let Err((e, _)) = network_relay .send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe { - topic: MOCK_PUB_SUB_TOPIC, + topic: MOCK_PUB_SUB_TOPIC.to_string(), })) .await { @@ -76,10 +76,10 @@ impl NetworkAdapter for MockAdapter { match event { Ok(NetworkEvent::RawMessage(message)) => { tracing::info!("Received message: {:?}", message.payload()); - if message.content_topic() == MOCK_TX_CONTENT_TOPIC { - Some(MockTransactionMsg::Request(message)) + if message.content_topic().eq(&MOCK_TX_CONTENT_TOPIC) { + Some(MockTransaction::new(message)) } else { - Some(MockTransactionMsg::Response(message)) + None } } Err(_e) => None, diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index 3b2f8a47..857a93d2 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -1,6 +1,6 @@ use nomos_core::{ block::BlockId, - tx::mock::{MockTransactionMsg, MockTxId}, + tx::mock::{MockTransaction, MockTxId}, }; use nomos_log::{Logger, LoggerSettings}; use nomos_network::{ @@ -20,7 +20,7 @@ use nomos_mempool::{ struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle>>, + mockpool: ServiceHandle>>, } #[test] @@ -70,7 +70,7 @@ fn test_mockmempool() { let network = app.handle().relay::>(); let mempool = app .handle() - .relay::>>(); + .relay::>>(); app.spawn(async move { let network_outbound = network.connect().await.unwrap(); @@ -79,7 +79,7 @@ fn test_mockmempool() { // subscribe to the mock content topic network_outbound .send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe { - topic: MOCK_TX_CONTENT_TOPIC.content_topic_name, + topic: MOCK_TX_CONTENT_TOPIC.content_topic_name.to_string(), })) .await .unwrap(); @@ -99,14 +99,7 @@ fn test_mockmempool() { let items = mrx .await .unwrap() - .into_iter() - .filter_map(|tx| { - if let MockTransactionMsg::Request(msg) = tx { - Some(msg) - } else { - None - } - }) + .map(|msg| msg.message().clone()) .collect::>(); if items.len() == exp_txns.len() { diff --git a/nomos-services/network/src/backends/mock.rs b/nomos-services/network/src/backends/mock.rs index 39d0002b..f8e11331 100644 --- a/nomos-services/network/src/backends/mock.rs +++ b/nomos-services/network/src/backends/mock.rs @@ -11,6 +11,7 @@ use rand::{ }; use serde::{Deserialize, Serialize}; use std::{ + borrow::Cow, collections::{HashMap, HashSet}, sync::{Arc, Mutex}, }; @@ -21,11 +22,11 @@ const BROADCAST_CHANNEL_BUF: usize = 16; pub type MockMessageVersion = usize; -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] pub struct MockContentTopic { - pub application_name: &'static str, + pub application_name: Cow<'static, str>, pub version: usize, - pub content_topic_name: &'static str, + pub content_topic_name: Cow<'static, str>, } impl MockContentTopic { @@ -35,25 +36,27 @@ impl MockContentTopic { content_topic_name: &'static str, ) -> Self { Self { - application_name, + application_name: Cow::Borrowed(application_name), version, - content_topic_name, + content_topic_name: Cow::Borrowed(content_topic_name), } } } #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct MockPubSubTopic { - pub topic_name: &'static str, + pub topic_name: Cow<'static, str>, } impl MockPubSubTopic { pub const fn new(topic_name: &'static str) -> Self { - Self { topic_name } + Self { + topic_name: Cow::Borrowed(topic_name), + } } } -#[derive(Clone, PartialEq, Eq, Hash, Serialize, Debug)] +#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct MockMessage { pub payload: String, @@ -81,8 +84,8 @@ impl MockMessage { } } - pub const fn content_topic(&self) -> MockContentTopic { - self.content_topic + pub const fn content_topic(&self) -> &MockContentTopic { + &self.content_topic } pub fn payload(&self) -> String { @@ -92,9 +95,9 @@ impl MockMessage { #[derive(Clone)] pub struct Mock { - messages: Arc>>>, + messages: Arc>>>, message_event: Sender, - subscribed_topics: Arc>>, + subscribed_topics: Arc>>, config: MockConfig, } @@ -120,17 +123,17 @@ pub enum MockBackendMessage { >, }, Broadcast { - topic: &'static str, + topic: String, msg: MockMessage, }, RelaySubscribe { - topic: &'static str, + topic: String, }, RelayUnSubscribe { - topic: &'static str, + topic: String, }, Query { - topic: &'static str, + topic: String, tx: oneshot::Sender>, }, } @@ -237,7 +240,7 @@ impl NetworkBackend for Mock { config .predefined_messages .iter() - .map(|p| (p.content_topic.content_topic_name, Vec::new())) + .map(|p| (p.content_topic.content_topic_name.to_string(), Vec::new())) .collect(), )), message_event, @@ -273,7 +276,7 @@ impl NetworkBackend for Mock { } MockBackendMessage::RelayUnSubscribe { topic } => { tracing::info!("processed relay unsubscription for topic: {topic}"); - self.subscribed_topics.lock().unwrap().remove(topic); + self.subscribed_topics.lock().unwrap().remove(&topic); } MockBackendMessage::Query { topic, tx } => { tracing::info!("processed query"); @@ -305,9 +308,9 @@ mod tests { MockMessage { payload: "foo".to_string(), content_topic: MockContentTopic { - application_name: "mock network", + application_name: "mock network".into(), version: 0, - content_topic_name: "foo", + content_topic_name: "foo".into(), }, version: 0, timestamp: 0, @@ -315,9 +318,9 @@ mod tests { MockMessage { payload: "bar".to_string(), content_topic: MockContentTopic { - application_name: "mock network", + application_name: "mock network".into(), version: 0, - content_topic_name: "bar", + content_topic_name: "bar".into(), }, version: 0, timestamp: 0, @@ -342,13 +345,13 @@ mod tests { // broadcast for val in FOO_BROADCAST_MESSAGES { mock.process(MockBackendMessage::Broadcast { - topic: "foo", + topic: "foo".to_string(), msg: MockMessage { payload: val.to_string(), content_topic: MockContentTopic { - application_name: "mock", + application_name: "mock".into(), version: 1, - content_topic_name: "foo content", + content_topic_name: "foo content".into(), }, version: 1, timestamp: chrono::Utc::now().timestamp() as usize, @@ -359,13 +362,13 @@ mod tests { for val in BAR_BROADCAST_MESSAGES { mock.process(MockBackendMessage::Broadcast { - topic: "bar", + topic: "bar".to_string(), msg: MockMessage { payload: val.to_string(), content_topic: MockContentTopic { - application_name: "mock", + application_name: "mock".into(), version: 1, - content_topic_name: "bar content", + content_topic_name: "bar content".into(), }, version: 1, timestamp: chrono::Utc::now().timestamp() as usize, @@ -377,7 +380,7 @@ mod tests { // query let (qtx, qrx) = oneshot::channel(); mock.process(MockBackendMessage::Query { - topic: "foo", + topic: "foo".to_string(), tx: qtx, }) .await; @@ -388,20 +391,28 @@ mod tests { } // subscribe - mock.process(MockBackendMessage::RelaySubscribe { topic: "foo" }) - .await; - mock.process(MockBackendMessage::RelaySubscribe { topic: "bar" }) - .await; + mock.process(MockBackendMessage::RelaySubscribe { + topic: "foo".to_string(), + }) + .await; + mock.process(MockBackendMessage::RelaySubscribe { + topic: "bar".to_string(), + }) + .await; assert!(mock.subscribed_topics.lock().unwrap().contains("foo")); assert!(mock.subscribed_topics.lock().unwrap().contains("bar")); // unsubscribe - mock.process(MockBackendMessage::RelayUnSubscribe { topic: "foo" }) - .await; + mock.process(MockBackendMessage::RelayUnSubscribe { + topic: "foo".to_string(), + }) + .await; assert!(!mock.subscribed_topics.lock().unwrap().contains("foo")); assert!(mock.subscribed_topics.lock().unwrap().contains("bar")); - mock.process(MockBackendMessage::RelayUnSubscribe { topic: "bar" }) - .await; + mock.process(MockBackendMessage::RelayUnSubscribe { + topic: "bar".to_string(), + }) + .await; assert!(!mock.subscribed_topics.lock().unwrap().contains("foo")); assert!(!mock.subscribed_topics.lock().unwrap().contains("bar")); }