diff --git a/nodes/mockpool-node/src/bridges.rs b/nodes/mockpool-node/src/bridges.rs index 70c584a9..1d1529e0 100644 --- a/nodes/mockpool-node/src/bridges.rs +++ b/nodes/mockpool-node/src/bridges.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tracing::error; // internal -use crate::tx::{Tx, TxId}; +use crate::tx::Tx; use futures::future::join_all; use multiaddr::Multiaddr; use nomos_core::wire; @@ -27,15 +27,14 @@ pub fn mempool_metrics_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { Box::new(Box::pin(async move { - let (mempool_channel, mut http_request_channel) = build_http_bridge::< - MempoolService, MockPool>, - AxumBackend, - _, - >( - handle, HttpMethod::GET, "metrics" - ) - .await - .unwrap(); + let (mempool_channel, mut http_request_channel) = + build_http_bridge::, MockPool>, AxumBackend, _>( + handle, + HttpMethod::GET, + "metrics", + ) + .await + .unwrap(); while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await { if let Err(e) = handle_metrics_req(&mempool_channel, res_tx).await { @@ -50,17 +49,14 @@ pub fn mempool_add_tx_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { Box::new(Box::pin(async move { - let (mempool_channel, mut http_request_channel) = build_http_bridge::< - MempoolService, MockPool>, - AxumBackend, - _, - >( - handle.clone(), - HttpMethod::POST, - "addtx", - ) - .await - .unwrap(); + let (mempool_channel, mut http_request_channel) = + build_http_bridge::, MockPool>, AxumBackend, _>( + handle.clone(), + HttpMethod::POST, + "addtx", + ) + .await + .unwrap(); while let Some(HttpRequest { res_tx, payload, .. @@ -121,7 +117,7 @@ pub fn waku_add_conn_bridge( } async fn handle_metrics_req( - mempool_channel: &OutboundRelay>, + mempool_channel: &OutboundRelay>, res_tx: Sender, ) -> Result<(), overwatch_rs::DynError> { let (sender, receiver) = oneshot::channel(); @@ -147,7 +143,7 @@ async fn handle_metrics_req( async fn handle_add_tx_req( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, - mempool_channel: &OutboundRelay>, + mempool_channel: &OutboundRelay>, res_tx: Sender, payload: Option, ) -> Result<(), overwatch_rs::DynError> { diff --git a/nodes/mockpool-node/src/main.rs b/nodes/mockpool-node/src/main.rs index 5f11a62d..786b0c1b 100644 --- a/nodes/mockpool-node/src/main.rs +++ b/nodes/mockpool-node/src/main.rs @@ -18,7 +18,7 @@ use overwatch_rs::{ }; use serde::Deserialize; use std::sync::Arc; -use tx::{Tx, TxId}; +use tx::Tx; /// Simple program to greet a person #[derive(Parser, Debug)] @@ -39,7 +39,7 @@ struct Config { struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle, MockPool>>, + mockpool: ServiceHandle, MockPool>>, http: ServiceHandle>, bridges: ServiceHandle, } diff --git a/nodes/mockpool-node/src/tx.rs b/nodes/mockpool-node/src/tx.rs index 607626f4..542de0a0 100644 --- a/nodes/mockpool-node/src/tx.rs +++ b/nodes/mockpool-node/src/tx.rs @@ -1,42 +1,20 @@ -use blake2::digest::{Update, VariableOutput}; -use blake2::Blake2bVar; +use bytes::Bytes; +use nomos_core::tx::{Transaction, TransactionHasher}; use serde::{Deserialize, Serialize}; use std::hash::Hash; #[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct Tx(pub String); -#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)] -pub struct TxId([u8; 32]); - -impl From<&Tx> for TxId { - fn from(tx: &Tx) -> Self { - let mut hasher = Blake2bVar::new(32).unwrap(); - hasher.update( - bincode::serde::encode_to_vec(tx, bincode::config::standard()) - .unwrap() - .as_slice(), - ); - let mut id = [0u8; 32]; - hasher.finalize_variable(&mut id).unwrap(); - Self(id) - } +fn hash_tx(tx: &Tx) -> String { + tx.0.clone() } -#[cfg(test)] -mod test { - use super::*; +impl Transaction for Tx { + const HASHER: TransactionHasher = hash_tx; + type Hash = String; - #[test] - fn test_txid() { - let tx = Tx("test".to_string()); - let txid = TxId::from(&tx); - assert_eq!( - txid.0, - [ - 39, 227, 252, 176, 211, 134, 68, 39, 134, 158, 47, 7, 82, 40, 169, 232, 168, 118, - 240, 103, 84, 146, 127, 64, 60, 196, 126, 142, 172, 156, 124, 78 - ] - ); + fn as_bytes(&self) -> Bytes { + self.0.as_bytes().to_vec().into() } } diff --git a/nomos-core/Cargo.toml b/nomos-core/Cargo.toml index 14b96383..3ef3bed5 100644 --- a/nomos-core/Cargo.toml +++ b/nomos-core/Cargo.toml @@ -10,6 +10,7 @@ authors = [ [dependencies] async-trait = { version = "0.1" } +blake2 = { version = "0.10" } bytes = "1.3" futures = "0.3" nomos-network = { path = "../nomos-services/network", optional = true } @@ -19,7 +20,6 @@ thiserror = "1.0" bincode = "1.3" once_cell = "1.0" indexmap = { version = "1.9", features = ["serde-1"] } -blake2 = { version = "0.10", optional = true } serde_json = { version = "1", optional = true } [dev-dependencies] @@ -30,4 +30,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] } [features] default = [] raptor = ["raptorq"] -mock = ["nomos-network/mock", "blake2", "serde_json"] +mock = ["nomos-network/mock", "serde_json"] diff --git a/nomos-core/src/block.rs b/nomos-core/src/block.rs index a5f6f938..065bc39e 100644 --- a/nomos-core/src/block.rs +++ b/nomos-core/src/block.rs @@ -10,7 +10,7 @@ pub type TxHash = [u8; 32]; /// A block #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Block { +pub struct Block { header: BlockHeader, transactions: IndexSet, } @@ -24,7 +24,7 @@ pub struct BlockHeader { /// Identifier of a block pub type BlockId = [u8; 32]; -impl Block { +impl Block { pub fn new(header: BlockHeader, txs: impl Iterator) -> Self { Self { header, diff --git a/nomos-core/src/tx/carnot.rs b/nomos-core/src/tx/carnot.rs deleted file mode 100644 index 2523e87a..00000000 --- a/nomos-core/src/tx/carnot.rs +++ /dev/null @@ -1,10 +0,0 @@ -// std -// crates -use serde::{Deserialize, Serialize}; -// internal -pub use crate::tx::transaction::Transaction; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Tx { - Transfer(Transaction), -} diff --git a/nomos-core/src/tx/carnot/mod.rs b/nomos-core/src/tx/carnot/mod.rs new file mode 100644 index 00000000..63d6e319 --- /dev/null +++ b/nomos-core/src/tx/carnot/mod.rs @@ -0,0 +1,35 @@ +// std +// crates +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +// internal +pub use crate::tx::carnot::transaction::TransferTransaction; +use crate::tx::{Transaction, TransactionHasher}; + +mod transaction; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Tx { + Transfer(TransferTransaction), +} + +// TODO: We should probably abstract the de/serialization of the transaction as it s done in transaction.rs +fn hash_carnot_tx(tx: &Tx) -> [u8; 32] { + use blake2::{ + digest::{consts::U32, Digest}, + Blake2b, + }; + let mut hasher = Blake2b::::new(); + hasher.update(::as_bytes(tx)); + let res = hasher.finalize(); + res.into() +} + +impl Transaction for Tx { + const HASHER: TransactionHasher = hash_carnot_tx; + type Hash = [u8; 32]; + + fn as_bytes(&self) -> Bytes { + [].to_vec().into() + } +} diff --git a/nomos-core/src/tx/transaction.rs b/nomos-core/src/tx/carnot/transaction.rs similarity index 83% rename from nomos-core/src/tx/transaction.rs rename to nomos-core/src/tx/carnot/transaction.rs index d0a0d846..516e9ceb 100644 --- a/nomos-core/src/tx/transaction.rs +++ b/nomos-core/src/tx/carnot/transaction.rs @@ -7,7 +7,7 @@ use crate::crypto::Signature; /// but does not imply that it can be successfully applied /// to the ledger. #[derive(Clone, Debug)] -pub struct Transaction { +pub struct TransferTransaction { pub from: AccountId, pub to: AccountId, pub value: u64, @@ -26,26 +26,26 @@ mod serde { // This would also allow to control ser/de independently from the Rust // representation. #[derive(Serialize, Deserialize)] - struct WireTransaction { + struct WireTransferTransaction { from: AccountId, to: AccountId, value: u64, signature: Signature, } - impl<'de> Deserialize<'de> for Transaction { + impl<'de> Deserialize<'de> for TransferTransaction { fn deserialize(deserializer: D) -> Result where D: Deserializer<'de>, { - let WireTransaction { + let WireTransferTransaction { from, to, value, signature, - } = WireTransaction::deserialize(deserializer)?; + } = WireTransferTransaction::deserialize(deserializer)?; //TODO: check signature - Ok(Transaction { + Ok(TransferTransaction { from, to, value, @@ -54,12 +54,12 @@ mod serde { } } - impl Serialize for Transaction { + impl Serialize for TransferTransaction { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - WireTransaction { + WireTransferTransaction { from: self.from.clone(), to: self.to.clone(), value: self.value, diff --git a/nomos-core/src/tx/mock.rs b/nomos-core/src/tx/mock.rs index 16ce87fa..86c7d303 100644 --- a/nomos-core/src/tx/mock.rs +++ b/nomos-core/src/tx/mock.rs @@ -1,8 +1,11 @@ +use crate::tx::{Transaction, TransactionHasher}; +use crate::wire; use crate::wire::serialize; use blake2::{ digest::{Update, VariableOutput}, Blake2bVar, }; +use bytes::{Bytes, BytesMut}; use nomos_network::backends::mock::MockMessage; #[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] @@ -20,6 +23,27 @@ impl MockTransaction { pub fn message(&self) -> &MockMessage { &self.content } + + pub fn id(&self) -> MockTxId { + self.id + } + + fn as_bytes(&self) -> Bytes { + let mut buff = BytesMut::new(); + wire::serializer_into_buffer(&mut buff) + .serialize_into(&self) + .expect("MockTransaction serialization to buffer failed"); + buff.freeze() + } +} + +impl Transaction for MockTransaction { + const HASHER: TransactionHasher = MockTransaction::id; + type Hash = MockTxId; + + fn as_bytes(&self) -> Bytes { + MockTransaction::as_bytes(self) + } } impl From for MockTransaction { diff --git a/nomos-core/src/tx/mod.rs b/nomos-core/src/tx/mod.rs index fbfb9496..13c744ff 100644 --- a/nomos-core/src/tx/mod.rs +++ b/nomos-core/src/tx/mod.rs @@ -1,4 +1,20 @@ +use std::hash::Hash; +// std +// crates +use bytes::Bytes; +// internal + pub mod carnot; #[cfg(feature = "mock")] pub mod mock; -mod transaction; + +pub type TransactionHasher = fn(&T) -> ::Hash; + +pub trait Transaction { + const HASHER: TransactionHasher; + type Hash: Hash + Eq + Clone; + fn hash(&self) -> Self::Hash { + Self::HASHER(self) + } + fn as_bytes(&self) -> Bytes; +} diff --git a/nomos-services/consensus/src/leadership.rs b/nomos-services/consensus/src/leadership.rs index 27af0e5a..5a8f0dcf 100644 --- a/nomos-services/consensus/src/leadership.rs +++ b/nomos-services/consensus/src/leadership.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; // crates // internal +use nomos_core::tx::Transaction; use nomos_core::{block::BlockHeader, crypto::PrivateKey}; use nomos_mempool::MempoolMsg; @@ -12,12 +13,12 @@ struct Enclave { key: PrivateKey, } -pub struct Leadership { +pub struct Leadership { key: Enclave, - mempool: OutboundRelay>, + mempool: OutboundRelay>, } -pub enum LeadershipResult<'view, TxId: Eq + core::hash::Hash> { +pub enum LeadershipResult<'view, TxId: Clone + Eq + core::hash::Hash> { Leader { block: Block, _view: PhantomData<&'view u8>, @@ -27,12 +28,12 @@ pub enum LeadershipResult<'view, TxId: Eq + core::hash::Hash> { }, } -impl Leadership +impl Leadership where - Id: Eq + core::hash::Hash, - for<'t> &'t Tx: Into, // TODO: we should probably abstract this away but for now the constrain may do + Tx: Transaction, + Tx::Hash: Debug, { - pub fn new(key: PrivateKey, mempool: OutboundRelay>) -> Self { + pub fn new(key: PrivateKey, mempool: OutboundRelay>) -> Self { Self { key: Enclave { key }, mempool, @@ -45,7 +46,7 @@ where view: &'view View, tip: &Tip, qc: Qc, - ) -> LeadershipResult<'view, Id> { + ) -> LeadershipResult<'view, Tx::Hash> { // TODO: get the correct ancestor for the tip // let ancestor_hint = todo!("get the ancestor from the tip"); let ancestor_hint = [0; 32]; @@ -59,7 +60,10 @@ where LeadershipResult::Leader { _view: PhantomData, - block: Block::new(BlockHeader::default(), iter.map(|ref tx| tx.into())), + block: Block::new( + BlockHeader::default(), + iter.map(|ref tx| ::hash(tx)), + ), } } else { LeadershipResult::NotLeader { _view: PhantomData } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index a9a88cf3..5a7e173b 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -18,10 +18,11 @@ use serde::{Deserialize, Serialize}; // internal use crate::network::NetworkAdapter; use leadership::{Leadership, LeadershipResult}; -use nomos_core::block::{Block, TxHash}; +use nomos_core::block::Block; use nomos_core::crypto::PublicKey; use nomos_core::fountain::FountainCode; use nomos_core::staking::Stake; +use nomos_core::tx::Transaction; use nomos_core::vote::Tally; use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService}; use nomos_network::NetworkService; @@ -79,9 +80,9 @@ where M: MempoolAdapter, P: MemPool, T: Tally, - O: Overlay, - P::Tx: Debug + 'static, - P::Id: Debug + 'static, + O: Overlay::Hash>, + P::Tx: Transaction + Debug + 'static, + ::Hash: Debug, A::Backend: 'static, { service_state: ServiceStateHandle, @@ -100,10 +101,10 @@ where A: NetworkAdapter, P: MemPool, T: Tally, - P::Tx: Debug, - P::Id: Debug, + P::Tx: Transaction + Debug, + ::Hash: Debug, M: MempoolAdapter, - O: Overlay, + O: Overlay::Hash>, { const SERVICE_ID: ServiceId = "Carnot"; type Settings = CarnotSettings; @@ -123,18 +124,9 @@ where T::Outcome: Send + Sync, P::Settings: Send + Sync + 'static, P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static, - for<'t> &'t P::Tx: Into, - P::Id: Debug - + Clone - + serde::de::DeserializeOwned - + for<'a> From<&'a P::Tx> - + Eq - + Hash - + Send - + Sync - + 'static, + ::Hash: Debug + Send + Sync, M: MempoolAdapter + Send + Sync + 'static, - O: Overlay + Send + Sync + 'static, + O: Overlay::Hash> + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -175,7 +167,7 @@ where let fountain = F::new(fountain_settings); let tally = T::new(tally_settings); - let leadership = Leadership::::new(private_key, mempool_relay.clone()); + let leadership = Leadership::::new(private_key, mempool_relay.clone()); // FIXME: this should be taken from config let mut cur_view = View { seed: [0; 32], @@ -244,15 +236,16 @@ impl View { adapter: &A, fountain: &F, tally: &T, - leadership: &Leadership, - ) -> Result<(Block, View), Box> + leadership: &Leadership, + ) -> Result<(Block, View), Box> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, - for<'t> &'t Tx: Into, + Tx: Transaction, + Tx::Hash: Debug, T: Tally + Send + Sync + 'static, T::Outcome: Send + Sync, - O: Overlay, + O: Overlay, { let res = if self.is_leader(node_id) { let block = self @@ -262,7 +255,7 @@ impl View { let next_view = self.generate_next_view(&block); (block, next_view) } else { - self.resolve_non_leader::(node_id, adapter, fountain, tally) + self.resolve_non_leader::(node_id, adapter, fountain, tally) .await .unwrap() // FIXME: handle sad path }; @@ -283,15 +276,16 @@ impl View { adapter: &A, fountain: &F, tally: &T, - leadership: &Leadership, - ) -> Result, ()> + leadership: &Leadership, + ) -> Result::Hash>, ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, T: Tally + Send + Sync + 'static, T::Outcome: Send + Sync, - for<'t> &'t Tx: Into, - O: Overlay, + Tx: Transaction, + Tx::Hash: Debug, + O: Overlay, { let overlay = O::new(self, node_id); @@ -309,18 +303,20 @@ impl View { Ok(block) } - async fn resolve_non_leader<'view, A, O, F, T>( + async fn resolve_non_leader<'view, A, O, F, T, Tx>( &'view self, node_id: NodeId, adapter: &A, fountain: &F, tally: &T, - ) -> Result<(Block, View), ()> + ) -> Result<(Block, View), ()> where A: NetworkAdapter + Send + Sync + 'static, F: FountainCode, T: Tally + Send + Sync + 'static, - O: Overlay, + Tx: Transaction, + Tx::Hash: Debug, + O: Overlay, { let overlay = O::new(self, node_id); // Consensus in Carnot is achieved in 2 steps from the point of view of a node: @@ -360,12 +356,12 @@ impl View { } // Verifies the block is new and the previous leader did not fail - fn pipelined_safe_block(&self, _: &Block) -> bool { + fn pipelined_safe_block(&self, _: &Block) -> bool { // return b.view_n >= self.view_n && b.view_n == b.qc.view_n true } - fn generate_next_view(&self, _b: &Block) -> View { + fn generate_next_view(&self, _b: &Block) -> View { let mut seed = self.seed; seed[0] += 1; View { diff --git a/nomos-services/consensus/src/overlay/committees.rs b/nomos-services/consensus/src/overlay/committees.rs index 05b1270b..23f657c8 100644 --- a/nomos-services/consensus/src/overlay/committees.rs +++ b/nomos-services/consensus/src/overlay/committees.rs @@ -10,44 +10,38 @@ use crate::network::messages::ProposalChunkMsg; use crate::network::NetworkAdapter; /// View of the tree overlay centered around a specific member -pub struct Member { +pub struct Member { // id is not used now, but gonna probably used it for later checking later on #[allow(dead_code)] id: NodeId, committee: Committee, - committees: Committees, + committees: Committees, view_n: u64, - _marker: std::marker::PhantomData, } /// #Just a newtype index to be able to implement parent/children methods #[derive(Copy, Clone)] pub struct Committee(usize); -pub struct Committees { +pub struct Committees { nodes: Box<[NodeId]>, - _marker: std::marker::PhantomData, } -impl Committees { +impl Committees { pub fn new(view: &View) -> Self { let mut nodes = view.staking_keys.keys().cloned().collect::>(); let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed); nodes.shuffle(&mut rng); - Self { - nodes, - _marker: std::marker::PhantomData, - } + Self { nodes } } - pub fn into_member(self, id: NodeId, view: &View) -> Option> { + pub fn into_member(self, id: NodeId, view: &View) -> Option> { let member_idx = self.nodes.iter().position(|m| m == &id)?; Some(Member { committee: Committee(member_idx / C), committees: self, id, view_n: view.view_n, - _marker: std::marker::PhantomData, }) } @@ -92,7 +86,7 @@ impl Committee { } } -impl Member { +impl Member { /// Return other members of this committee pub fn peers(&self) -> &[NodeId] { self.committees @@ -117,15 +111,14 @@ impl Member { } #[async_trait::async_trait] -impl Overlay - for Member +impl Overlay + for Member where Network: NetworkAdapter + Sync, Fountain: FountainCode + Sync, VoteTally: Tally + Sync, - TxId: serde::de::DeserializeOwned + Eq + Hash + Clone + Send + Sync + 'static, + TxId: serde::de::DeserializeOwned + Clone + Hash + Eq + Send + Sync + 'static, { - type TxId = TxId; // we still need view here to help us initialize fn new(view: &View, node: NodeId) -> Self { let committees = Committees::new(view); @@ -137,13 +130,13 @@ where view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result, FountainError> { + ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let committee = self.committee; let message_stream = adapter.proposal_chunks_stream(committee, view).await; fountain.decode(message_stream).await.and_then(|b| { deserializer(&b) - .deserialize::>() + .deserialize::>() .map_err(|e| FountainError::from(e.to_string().as_str())) }) } @@ -151,7 +144,7 @@ where async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ) { @@ -180,7 +173,7 @@ where async fn approve_and_forward( &self, view: &View, - _block: &Block, + _block: &Block, _adapter: &Network, _tally: &VoteTally, _next_view: &View, diff --git a/nomos-services/consensus/src/overlay/flat.rs b/nomos-services/consensus/src/overlay/flat.rs index e69efae5..2b052a36 100644 --- a/nomos-services/consensus/src/overlay/flat.rs +++ b/nomos-services/consensus/src/overlay/flat.rs @@ -25,7 +25,7 @@ pub struct Flat { _marker: std::marker::PhantomData, } -impl Flat { +impl Flat { pub fn new(view_n: u64, node_id: NodeId) -> Self { Self { node_id, @@ -41,7 +41,7 @@ impl Flat { } #[async_trait::async_trait] -impl Overlay for Flat +impl Overlay for Flat where TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, Network: NetworkAdapter + Sync, @@ -49,8 +49,6 @@ where VoteTally: Tally + Sync, VoteTally::Vote: Serialize + DeserializeOwned + Send, { - type TxId = TxId; - fn new(view: &View, node: NodeId) -> Self { Flat::new(view.view_n, node) } @@ -60,12 +58,12 @@ where view: &View, adapter: &Network, fountain: &Fountain, - ) -> Result, FountainError> { + ) -> Result, FountainError> { assert_eq!(view.view_n, self.view_n, "view_n mismatch"); let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await; fountain.decode(message_stream).await.and_then(|b| { deserializer(&b) - .deserialize::>() + .deserialize::>() .map_err(|e| FountainError::from(e.to_string().as_str())) }) } @@ -73,7 +71,7 @@ where async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ) { @@ -93,7 +91,7 @@ where async fn approve_and_forward( &self, view: &View, - block: &Block, + block: &Block, adapter: &Network, _tally: &VoteTally, _next_view: &View, diff --git a/nomos-services/consensus/src/overlay/mod.rs b/nomos-services/consensus/src/overlay/mod.rs index 46c62f61..90664d92 100644 --- a/nomos-services/consensus/src/overlay/mod.rs +++ b/nomos-services/consensus/src/overlay/mod.rs @@ -3,6 +3,7 @@ mod flat; // std use std::error::Error; +use std::hash::Hash; // crates // internal use super::{Approval, NodeId, View}; @@ -14,9 +15,13 @@ use nomos_core::vote::Tally; /// Dissemination overlay, tied to a specific view #[async_trait::async_trait] -pub trait Overlay { - type TxId: serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash + Send + Sync + 'static; - +pub trait Overlay< + Network: NetworkAdapter, + Fountain: FountainCode, + VoteTally: Tally, + TxId: Clone + Eq + Hash, +> +{ fn new(view: &View, node: NodeId) -> Self; async fn reconstruct_proposal_block( @@ -24,11 +29,11 @@ pub trait Overlay Result, FountainError>; + ) -> Result, FountainError>; async fn broadcast_block( &self, view: &View, - block: Block, + block: Block, adapter: &Network, fountain: &Fountain, ); @@ -38,7 +43,7 @@ pub trait Overlay, + block: &Block, adapter: &Network, vote_tally: &VoteTally, next_view: &View, diff --git a/nomos-services/mempool/src/backend/mockpool.rs b/nomos-services/mempool/src/backend/mockpool.rs index dbb6cbb6..df86d089 100644 --- a/nomos-services/mempool/src/backend/mockpool.rs +++ b/nomos-services/mempool/src/backend/mockpool.rs @@ -7,19 +7,20 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH}; // internal use crate::backend::{MemPool, MempoolError}; use nomos_core::block::{BlockHeader, BlockId}; +use nomos_core::tx::Transaction; /// A mock mempool implementation that stores all transactions in memory in the order received. -pub struct MockPool { - pending_txs: LinkedHashMap, +pub struct MockPool +where + Tx::Hash: Hash, +{ + pending_txs: LinkedHashMap, in_block_txs: BTreeMap>, - in_block_txs_by_id: BTreeMap, + in_block_txs_by_id: BTreeMap, last_tx_timestamp: u64, } -impl Default for MockPool -where - Id: Eq + Hash, -{ +impl Default for MockPool { fn default() -> Self { Self { pending_txs: LinkedHashMap::new(), @@ -30,30 +31,29 @@ where } } -impl MockPool +impl MockPool where - Id: Eq + Hash, + Tx::Hash: Ord, { pub fn new() -> Self { Default::default() } } -impl MemPool for MockPool +impl MemPool for MockPool where - Id: for<'t> From<&'t Tx> + PartialOrd + Ord + Eq + Hash + Clone, - Tx: Clone + Send + Sync + 'static + Hash, + Tx: Transaction + Clone + Send + Sync + 'static + Hash, + Tx::Hash: Ord, { type Settings = (); type Tx = Tx; - type Id = Id; fn new(_settings: Self::Settings) -> Self { Self::new() } fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError> { - let id = Id::from(&tx); + let id = ::hash(&tx); if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) { return Err(MempoolError::ExistingTx); } @@ -73,7 +73,7 @@ where Box::new(pending_txs.into_iter()) } - fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader) { + fn mark_in_block(&mut self, txs: &[::Hash], block: BlockHeader) { let mut txs_in_block = Vec::with_capacity(txs.len()); for tx_id in txs.iter() { if let Some(tx) = self.pending_txs.remove(tx_id) { @@ -96,7 +96,7 @@ where }) } - fn prune(&mut self, txs: &[Self::Id]) { + fn prune(&mut self, txs: &[::Hash]) { for tx_id in txs { self.pending_txs.remove(tx_id); } diff --git a/nomos-services/mempool/src/backend/mod.rs b/nomos-services/mempool/src/backend/mod.rs index 90258912..ffe7af65 100644 --- a/nomos-services/mempool/src/backend/mod.rs +++ b/nomos-services/mempool/src/backend/mod.rs @@ -2,6 +2,7 @@ pub mod mockpool; use nomos_core::block::{BlockHeader, BlockId}; +use nomos_core::tx::Transaction; #[derive(thiserror::Error, Debug)] pub enum MempoolError { @@ -13,8 +14,7 @@ pub enum MempoolError { pub trait MemPool { type Settings: Clone; - type Tx; - type Id; + type Tx: Transaction; /// Construct a new empty pool fn new(settings: Self::Settings) -> Self; @@ -30,7 +30,7 @@ pub trait MemPool { fn view(&self, ancestor_hint: BlockId) -> Box + Send>; /// Record that a set of transactions were included in a block - fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader); + fn mark_in_block(&mut self, txs: &[::Hash], block: BlockHeader); /// Returns all of the transactions for the block #[cfg(test)] @@ -41,7 +41,7 @@ pub trait MemPool { /// Signal that a set of transactions can't be possibly requested anymore and can be /// discarded. - fn prune(&mut self, txs: &[Self::Id]); + fn prune(&mut self, txs: &[::Hash]); fn pending_tx_count(&self) -> usize; fn last_tx_timestamp(&self) -> u64; diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 6867cc64..a5275d99 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -11,6 +11,7 @@ use tokio::sync::oneshot::Sender; use crate::network::NetworkAdapter; use backend::MemPool; use nomos_core::block::{BlockHeader, BlockId}; +use nomos_core::tx::Transaction; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -25,7 +26,7 @@ where P: MemPool, P::Settings: Clone, P::Tx: Debug + 'static, - P::Id: Debug + 'static, + ::Hash: Debug, { service_state: ServiceStateHandle, network_relay: Relay>, @@ -37,7 +38,7 @@ pub struct MempoolMetrics { pub last_tx_timestamp: u64, } -pub enum MempoolMsg { +pub enum MempoolMsg { AddTx { tx: Tx, reply_channel: Sender>, @@ -47,7 +48,7 @@ pub enum MempoolMsg { reply_channel: Sender + Send>>, }, Prune { - ids: Vec, + ids: Vec, }, #[cfg(test)] BlockTransaction { @@ -55,7 +56,7 @@ pub enum MempoolMsg { reply_channel: Sender + Send>>>, }, MarkInBlock { - ids: Vec, + ids: Vec, block: BlockHeader, }, Metrics { @@ -63,7 +64,10 @@ pub enum MempoolMsg { }, } -impl Debug for MempoolMsg { +impl Debug for MempoolMsg +where + Tx::Hash: Debug, +{ fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { match self { Self::View { ancestor_hint, .. } => { @@ -86,7 +90,7 @@ impl Debug for MempoolMsg { } } -impl RelayMessage for MempoolMsg {} +impl RelayMessage for MempoolMsg {} impl ServiceData for MempoolService where @@ -94,13 +98,13 @@ where P: MemPool, P::Settings: Clone, P::Tx: Debug + 'static, - P::Id: Debug + 'static, + ::Hash: Debug, { const SERVICE_ID: ServiceId = "Mempool"; type Settings = P::Settings; type State = NoState; type StateOperator = NoOperator; - type Message = MempoolMsg<

::Tx,

::Id>; + type Message = MempoolMsg<

::Tx>; } #[async_trait::async_trait] @@ -108,8 +112,8 @@ impl ServiceCore for MempoolService where P: MemPool + Send + 'static, P::Settings: Clone + Send + Sync + 'static, - P::Id: Debug + Send + 'static, - P::Tx: Clone + Debug + Send + Sync + 'static, + P::Tx: Transaction + Clone + Debug + Send + Sync + 'static, + ::Hash: Debug + Send + Sync + 'static, N: NetworkAdapter + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { diff --git a/nomos-services/mempool/tests/mock.rs b/nomos-services/mempool/tests/mock.rs index 857a93d2..8e85df0e 100644 --- a/nomos-services/mempool/tests/mock.rs +++ b/nomos-services/mempool/tests/mock.rs @@ -1,7 +1,4 @@ -use nomos_core::{ - block::BlockId, - tx::mock::{MockTransaction, MockTxId}, -}; +use nomos_core::{block::BlockId, tx::mock::MockTransaction}; use nomos_log::{Logger, LoggerSettings}; use nomos_network::{ backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage}, @@ -20,7 +17,7 @@ use nomos_mempool::{ struct MockPoolNode { logging: ServiceHandle, network: ServiceHandle>, - mockpool: ServiceHandle>>, + mockpool: ServiceHandle>>, } #[test] @@ -70,7 +67,7 @@ fn test_mockmempool() { let network = app.handle().relay::>(); let mempool = app .handle() - .relay::>>(); + .relay::>>(); app.spawn(async move { let network_outbound = network.connect().await.unwrap();