Make a transaction trait (#98)

* Impl Transaction trait

* Impl Transaction for MockTransaction

* <ake carnot transaction a module

* Refactor consensus to use Transaction

* Fix tests

* Constrain Transaction::Hash

* Refactor redundant Carnot in CarnotTx
This commit is contained in:
Daniel Sanchez 2023-03-16 22:42:56 -07:00 committed by GitHub
parent 780276497f
commit 91ce4e6fa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 228 additions and 192 deletions

View File

@ -6,7 +6,7 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::error;
// internal
use crate::tx::{Tx, TxId};
use crate::tx::Tx;
use futures::future::join_all;
use multiaddr::Multiaddr;
use nomos_core::wire;
@ -27,15 +27,14 @@ pub fn mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) = build_http_bridge::<
MempoolService<WakuAdapter<Tx>, MockPool<TxId, Tx>>,
AxumBackend,
_,
>(
handle, HttpMethod::GET, "metrics"
)
.await
.unwrap();
let (mempool_channel, mut http_request_channel) =
build_http_bridge::<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, AxumBackend, _>(
handle,
HttpMethod::GET,
"metrics",
)
.await
.unwrap();
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
if let Err(e) = handle_metrics_req(&mempool_channel, res_tx).await {
@ -50,17 +49,14 @@ pub fn mempool_add_tx_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (mempool_channel, mut http_request_channel) = build_http_bridge::<
MempoolService<WakuAdapter<Tx>, MockPool<TxId, Tx>>,
AxumBackend,
_,
>(
handle.clone(),
HttpMethod::POST,
"addtx",
)
.await
.unwrap();
let (mempool_channel, mut http_request_channel) =
build_http_bridge::<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, AxumBackend, _>(
handle.clone(),
HttpMethod::POST,
"addtx",
)
.await
.unwrap();
while let Some(HttpRequest {
res_tx, payload, ..
@ -121,7 +117,7 @@ pub fn waku_add_conn_bridge(
}
async fn handle_metrics_req(
mempool_channel: &OutboundRelay<MempoolMsg<Tx, TxId>>,
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let (sender, receiver) = oneshot::channel();
@ -147,7 +143,7 @@ async fn handle_metrics_req(
async fn handle_add_tx_req(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
mempool_channel: &OutboundRelay<MempoolMsg<Tx, TxId>>,
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
res_tx: Sender<HttpResponse>,
payload: Option<Bytes>,
) -> Result<(), overwatch_rs::DynError> {

View File

@ -18,7 +18,7 @@ use overwatch_rs::{
};
use serde::Deserialize;
use std::sync::Arc;
use tx::{Tx, TxId};
use tx::Tx;
/// Simple program to greet a person
#[derive(Parser, Debug)]
@ -39,7 +39,7 @@ struct Config {
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Waku>>,
mockpool: ServiceHandle<MempoolService<WakuAdapter<Tx>, MockPool<TxId, Tx>>>,
mockpool: ServiceHandle<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>>,
http: ServiceHandle<HttpService<AxumBackend>>,
bridges: ServiceHandle<HttpBridgeService>,
}

View File

@ -1,42 +1,20 @@
use blake2::digest::{Update, VariableOutput};
use blake2::Blake2bVar;
use bytes::Bytes;
use nomos_core::tx::{Transaction, TransactionHasher};
use serde::{Deserialize, Serialize};
use std::hash::Hash;
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
pub struct Tx(pub String);
#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)]
pub struct TxId([u8; 32]);
impl From<&Tx> for TxId {
fn from(tx: &Tx) -> Self {
let mut hasher = Blake2bVar::new(32).unwrap();
hasher.update(
bincode::serde::encode_to_vec(tx, bincode::config::standard())
.unwrap()
.as_slice(),
);
let mut id = [0u8; 32];
hasher.finalize_variable(&mut id).unwrap();
Self(id)
}
fn hash_tx(tx: &Tx) -> String {
tx.0.clone()
}
#[cfg(test)]
mod test {
use super::*;
impl Transaction for Tx {
const HASHER: TransactionHasher<Self> = hash_tx;
type Hash = String;
#[test]
fn test_txid() {
let tx = Tx("test".to_string());
let txid = TxId::from(&tx);
assert_eq!(
txid.0,
[
39, 227, 252, 176, 211, 134, 68, 39, 134, 158, 47, 7, 82, 40, 169, 232, 168, 118,
240, 103, 84, 146, 127, 64, 60, 196, 126, 142, 172, 156, 124, 78
]
);
fn as_bytes(&self) -> Bytes {
self.0.as_bytes().to_vec().into()
}
}

View File

@ -10,6 +10,7 @@ authors = [
[dependencies]
async-trait = { version = "0.1" }
blake2 = { version = "0.10" }
bytes = "1.3"
futures = "0.3"
nomos-network = { path = "../nomos-services/network", optional = true }
@ -19,7 +20,6 @@ thiserror = "1.0"
bincode = "1.3"
once_cell = "1.0"
indexmap = { version = "1.9", features = ["serde-1"] }
blake2 = { version = "0.10", optional = true }
serde_json = { version = "1", optional = true }
[dev-dependencies]
@ -30,4 +30,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] }
[features]
default = []
raptor = ["raptorq"]
mock = ["nomos-network/mock", "blake2", "serde_json"]
mock = ["nomos-network/mock", "serde_json"]

View File

@ -10,7 +10,7 @@ pub type TxHash = [u8; 32];
/// A block
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Block<TxId: Eq + Hash> {
pub struct Block<TxId: Clone + Eq + Hash> {
header: BlockHeader,
transactions: IndexSet<TxId>,
}
@ -24,7 +24,7 @@ pub struct BlockHeader {
/// Identifier of a block
pub type BlockId = [u8; 32];
impl<TxId: Eq + Hash> Block<TxId> {
impl<TxId: Clone + Eq + Hash> Block<TxId> {
pub fn new(header: BlockHeader, txs: impl Iterator<Item = TxId>) -> Self {
Self {
header,

View File

@ -1,10 +0,0 @@
// std
// crates
use serde::{Deserialize, Serialize};
// internal
pub use crate::tx::transaction::Transaction;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Tx {
Transfer(Transaction),
}

View File

@ -0,0 +1,35 @@
// std
// crates
use bytes::Bytes;
use serde::{Deserialize, Serialize};
// internal
pub use crate::tx::carnot::transaction::TransferTransaction;
use crate::tx::{Transaction, TransactionHasher};
mod transaction;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Tx {
Transfer(TransferTransaction),
}
// TODO: We should probably abstract the de/serialization of the transaction as it s done in transaction.rs
fn hash_carnot_tx(tx: &Tx) -> [u8; 32] {
use blake2::{
digest::{consts::U32, Digest},
Blake2b,
};
let mut hasher = Blake2b::<U32>::new();
hasher.update(<Tx as Transaction>::as_bytes(tx));
let res = hasher.finalize();
res.into()
}
impl Transaction for Tx {
const HASHER: TransactionHasher<Self> = hash_carnot_tx;
type Hash = [u8; 32];
fn as_bytes(&self) -> Bytes {
[].to_vec().into()
}
}

View File

@ -7,7 +7,7 @@ use crate::crypto::Signature;
/// but does not imply that it can be successfully applied
/// to the ledger.
#[derive(Clone, Debug)]
pub struct Transaction {
pub struct TransferTransaction {
pub from: AccountId,
pub to: AccountId,
pub value: u64,
@ -26,26 +26,26 @@ mod serde {
// This would also allow to control ser/de independently from the Rust
// representation.
#[derive(Serialize, Deserialize)]
struct WireTransaction {
struct WireTransferTransaction {
from: AccountId,
to: AccountId,
value: u64,
signature: Signature,
}
impl<'de> Deserialize<'de> for Transaction {
impl<'de> Deserialize<'de> for TransferTransaction {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let WireTransaction {
let WireTransferTransaction {
from,
to,
value,
signature,
} = WireTransaction::deserialize(deserializer)?;
} = WireTransferTransaction::deserialize(deserializer)?;
//TODO: check signature
Ok(Transaction {
Ok(TransferTransaction {
from,
to,
value,
@ -54,12 +54,12 @@ mod serde {
}
}
impl Serialize for Transaction {
impl Serialize for TransferTransaction {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
WireTransaction {
WireTransferTransaction {
from: self.from.clone(),
to: self.to.clone(),
value: self.value,

View File

@ -1,8 +1,11 @@
use crate::tx::{Transaction, TransactionHasher};
use crate::wire;
use crate::wire::serialize;
use blake2::{
digest::{Update, VariableOutput},
Blake2bVar,
};
use bytes::{Bytes, BytesMut};
use nomos_network::backends::mock::MockMessage;
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
@ -20,6 +23,27 @@ impl MockTransaction {
pub fn message(&self) -> &MockMessage {
&self.content
}
pub fn id(&self) -> MockTxId {
self.id
}
fn as_bytes(&self) -> Bytes {
let mut buff = BytesMut::new();
wire::serializer_into_buffer(&mut buff)
.serialize_into(&self)
.expect("MockTransaction serialization to buffer failed");
buff.freeze()
}
}
impl Transaction for MockTransaction {
const HASHER: TransactionHasher<Self> = MockTransaction::id;
type Hash = MockTxId;
fn as_bytes(&self) -> Bytes {
MockTransaction::as_bytes(self)
}
}
impl From<nomos_network::backends::mock::MockMessage> for MockTransaction {

View File

@ -1,4 +1,20 @@
use std::hash::Hash;
// std
// crates
use bytes::Bytes;
// internal
pub mod carnot;
#[cfg(feature = "mock")]
pub mod mock;
mod transaction;
pub type TransactionHasher<T> = fn(&T) -> <T as Transaction>::Hash;
pub trait Transaction {
const HASHER: TransactionHasher<Self>;
type Hash: Hash + Eq + Clone;
fn hash(&self) -> Self::Hash {
Self::HASHER(self)
}
fn as_bytes(&self) -> Bytes;
}

View File

@ -2,6 +2,7 @@
use std::marker::PhantomData;
// crates
// internal
use nomos_core::tx::Transaction;
use nomos_core::{block::BlockHeader, crypto::PrivateKey};
use nomos_mempool::MempoolMsg;
@ -12,12 +13,12 @@ struct Enclave {
key: PrivateKey,
}
pub struct Leadership<Tx, Id> {
pub struct Leadership<Tx: Transaction> {
key: Enclave,
mempool: OutboundRelay<MempoolMsg<Tx, Id>>,
mempool: OutboundRelay<MempoolMsg<Tx>>,
}
pub enum LeadershipResult<'view, TxId: Eq + core::hash::Hash> {
pub enum LeadershipResult<'view, TxId: Clone + Eq + core::hash::Hash> {
Leader {
block: Block<TxId>,
_view: PhantomData<&'view u8>,
@ -27,12 +28,12 @@ pub enum LeadershipResult<'view, TxId: Eq + core::hash::Hash> {
},
}
impl<Tx, Id> Leadership<Tx, Id>
impl<Tx> Leadership<Tx>
where
Id: Eq + core::hash::Hash,
for<'t> &'t Tx: Into<Id>, // TODO: we should probably abstract this away but for now the constrain may do
Tx: Transaction,
Tx::Hash: Debug,
{
pub fn new(key: PrivateKey, mempool: OutboundRelay<MempoolMsg<Tx, Id>>) -> Self {
pub fn new(key: PrivateKey, mempool: OutboundRelay<MempoolMsg<Tx>>) -> Self {
Self {
key: Enclave { key },
mempool,
@ -45,7 +46,7 @@ where
view: &'view View,
tip: &Tip,
qc: Qc,
) -> LeadershipResult<'view, Id> {
) -> LeadershipResult<'view, Tx::Hash> {
// TODO: get the correct ancestor for the tip
// let ancestor_hint = todo!("get the ancestor from the tip");
let ancestor_hint = [0; 32];
@ -59,7 +60,10 @@ where
LeadershipResult::Leader {
_view: PhantomData,
block: Block::new(BlockHeader::default(), iter.map(|ref tx| tx.into())),
block: Block::new(
BlockHeader::default(),
iter.map(|ref tx| <Tx as Transaction>::hash(tx)),
),
}
} else {
LeadershipResult::NotLeader { _view: PhantomData }

View File

@ -18,10 +18,11 @@ use serde::{Deserialize, Serialize};
// internal
use crate::network::NetworkAdapter;
use leadership::{Leadership, LeadershipResult};
use nomos_core::block::{Block, TxHash};
use nomos_core::block::Block;
use nomos_core::crypto::PublicKey;
use nomos_core::fountain::FountainCode;
use nomos_core::staking::Stake;
use nomos_core::tx::Transaction;
use nomos_core::vote::Tally;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter, MempoolService};
use nomos_network::NetworkService;
@ -79,9 +80,9 @@ where
M: MempoolAdapter<Tx = P::Tx>,
P: MemPool,
T: Tally,
O: Overlay<A, F, T>,
P::Tx: Debug + 'static,
P::Id: Debug + 'static,
O: Overlay<A, F, T, <P::Tx as Transaction>::Hash>,
P::Tx: Transaction + Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
A::Backend: 'static,
{
service_state: ServiceStateHandle<Self>,
@ -100,10 +101,10 @@ where
A: NetworkAdapter,
P: MemPool,
T: Tally,
P::Tx: Debug,
P::Id: Debug,
P::Tx: Transaction + Debug,
<P::Tx as Transaction>::Hash: Debug,
M: MempoolAdapter<Tx = P::Tx>,
O: Overlay<A, F, T>,
O: Overlay<A, F, T, <P::Tx as Transaction>::Hash>,
{
const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<F, T>;
@ -123,18 +124,9 @@ where
T::Outcome: Send + Sync,
P::Settings: Send + Sync + 'static,
P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static,
for<'t> &'t P::Tx: Into<TxHash>,
P::Id: Debug
+ Clone
+ serde::de::DeserializeOwned
+ for<'a> From<&'a P::Tx>
+ Eq
+ Hash
+ Send
+ Sync
+ 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay<A, F, T, TxId = P::Id> + Send + Sync + 'static,
O: Overlay<A, F, T, <P::Tx as Transaction>::Hash> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -175,7 +167,7 @@ where
let fountain = F::new(fountain_settings);
let tally = T::new(tally_settings);
let leadership = Leadership::<P::Tx, P::Id>::new(private_key, mempool_relay.clone());
let leadership = Leadership::<P::Tx>::new(private_key, mempool_relay.clone());
// FIXME: this should be taken from config
let mut cur_view = View {
seed: [0; 32],
@ -244,15 +236,16 @@ impl View {
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx, O::TxId>,
) -> Result<(Block<O::TxId>, View), Box<dyn std::error::Error + Send + Sync + 'static>>
leadership: &Leadership<Tx>,
) -> Result<(Block<Tx::Hash>, View), Box<dyn std::error::Error + Send + Sync + 'static>>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
for<'t> &'t Tx: Into<O::TxId>,
Tx: Transaction,
Tx::Hash: Debug,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
O: Overlay<A, F, T>,
O: Overlay<A, F, T, Tx::Hash>,
{
let res = if self.is_leader(node_id) {
let block = self
@ -262,7 +255,7 @@ impl View {
let next_view = self.generate_next_view(&block);
(block, next_view)
} else {
self.resolve_non_leader::<A, O, F, T>(node_id, adapter, fountain, tally)
self.resolve_non_leader::<A, O, F, T, Tx>(node_id, adapter, fountain, tally)
.await
.unwrap() // FIXME: handle sad path
};
@ -283,15 +276,16 @@ impl View {
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx, O::TxId>,
) -> Result<Block<O::TxId>, ()>
leadership: &Leadership<Tx>,
) -> Result<Block<<Tx as Transaction>::Hash>, ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
for<'t> &'t Tx: Into<O::TxId>,
O: Overlay<A, F, T>,
Tx: Transaction,
Tx::Hash: Debug,
O: Overlay<A, F, T, Tx::Hash>,
{
let overlay = O::new(self, node_id);
@ -309,18 +303,20 @@ impl View {
Ok(block)
}
async fn resolve_non_leader<'view, A, O, F, T>(
async fn resolve_non_leader<'view, A, O, F, T, Tx>(
&'view self,
node_id: NodeId,
adapter: &A,
fountain: &F,
tally: &T,
) -> Result<(Block<O::TxId>, View), ()>
) -> Result<(Block<Tx::Hash>, View), ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
T: Tally + Send + Sync + 'static,
O: Overlay<A, F, T>,
Tx: Transaction,
Tx::Hash: Debug,
O: Overlay<A, F, T, Tx::Hash>,
{
let overlay = O::new(self, node_id);
// Consensus in Carnot is achieved in 2 steps from the point of view of a node:
@ -360,12 +356,12 @@ impl View {
}
// Verifies the block is new and the previous leader did not fail
fn pipelined_safe_block<TxId: Eq + Hash>(&self, _: &Block<TxId>) -> bool {
fn pipelined_safe_block<TxId: Clone + Eq + Hash>(&self, _: &Block<TxId>) -> bool {
// return b.view_n >= self.view_n && b.view_n == b.qc.view_n
true
}
fn generate_next_view<TxId: Eq + Hash>(&self, _b: &Block<TxId>) -> View {
fn generate_next_view<TxId: Clone + Eq + Hash>(&self, _b: &Block<TxId>) -> View {
let mut seed = self.seed;
seed[0] += 1;
View {

View File

@ -10,44 +10,38 @@ use crate::network::messages::ProposalChunkMsg;
use crate::network::NetworkAdapter;
/// View of the tree overlay centered around a specific member
pub struct Member<TxId: Eq + Hash, const C: usize> {
pub struct Member<const C: usize> {
// id is not used now, but gonna probably used it for later checking later on
#[allow(dead_code)]
id: NodeId,
committee: Committee,
committees: Committees<TxId, C>,
committees: Committees<C>,
view_n: u64,
_marker: std::marker::PhantomData<TxId>,
}
/// #Just a newtype index to be able to implement parent/children methods
#[derive(Copy, Clone)]
pub struct Committee(usize);
pub struct Committees<TxId: Eq + Hash, const C: usize> {
pub struct Committees<const C: usize> {
nodes: Box<[NodeId]>,
_marker: std::marker::PhantomData<TxId>,
}
impl<TxId: Eq + Hash, const C: usize> Committees<TxId, C> {
impl<const C: usize> Committees<C> {
pub fn new(view: &View) -> Self {
let mut nodes = view.staking_keys.keys().cloned().collect::<Box<[NodeId]>>();
let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed);
nodes.shuffle(&mut rng);
Self {
nodes,
_marker: std::marker::PhantomData,
}
Self { nodes }
}
pub fn into_member(self, id: NodeId, view: &View) -> Option<Member<TxId, C>> {
pub fn into_member(self, id: NodeId, view: &View) -> Option<Member<C>> {
let member_idx = self.nodes.iter().position(|m| m == &id)?;
Some(Member {
committee: Committee(member_idx / C),
committees: self,
id,
view_n: view.view_n,
_marker: std::marker::PhantomData,
})
}
@ -92,7 +86,7 @@ impl Committee {
}
}
impl<TxId: Eq + Hash, const C: usize> Member<TxId, C> {
impl<const C: usize> Member<C> {
/// Return other members of this committee
pub fn peers(&self) -> &[NodeId] {
self.committees
@ -117,15 +111,14 @@ impl<TxId: Eq + Hash, const C: usize> Member<TxId, C> {
}
#[async_trait::async_trait]
impl<Network, Fountain, VoteTally, TxId, const C: usize> Overlay<Network, Fountain, VoteTally>
for Member<TxId, C>
impl<Network, Fountain, VoteTally, TxId, const C: usize> Overlay<Network, Fountain, VoteTally, TxId>
for Member<C>
where
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
TxId: serde::de::DeserializeOwned + Eq + Hash + Clone + Send + Sync + 'static,
TxId: serde::de::DeserializeOwned + Clone + Hash + Eq + Send + Sync + 'static,
{
type TxId = TxId;
// we still need view here to help us initialize
fn new(view: &View, node: NodeId) -> Self {
let committees = Committees::new(view);
@ -137,13 +130,13 @@ where
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block<Self::TxId>, FountainError> {
) -> Result<Block<TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let committee = self.committee;
let message_stream = adapter.proposal_chunks_stream(committee, view).await;
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<Self::TxId>>()
.deserialize::<Block<TxId>>()
.map_err(|e| FountainError::from(e.to_string().as_str()))
})
}
@ -151,7 +144,7 @@ where
async fn broadcast_block(
&self,
view: &View,
block: Block<Self::TxId>,
block: Block<TxId>,
adapter: &Network,
fountain: &Fountain,
) {
@ -180,7 +173,7 @@ where
async fn approve_and_forward(
&self,
view: &View,
_block: &Block<Self::TxId>,
_block: &Block<TxId>,
_adapter: &Network,
_tally: &VoteTally,
_next_view: &View,

View File

@ -25,7 +25,7 @@ pub struct Flat<TxId> {
_marker: std::marker::PhantomData<TxId>,
}
impl<TxId: Eq + Hash> Flat<TxId> {
impl<TxId: Clone + Eq + Hash> Flat<TxId> {
pub fn new(view_n: u64, node_id: NodeId) -> Self {
Self {
node_id,
@ -41,7 +41,7 @@ impl<TxId: Eq + Hash> Flat<TxId> {
}
#[async_trait::async_trait]
impl<Network, Fountain, VoteTally, TxId> Overlay<Network, Fountain, VoteTally> for Flat<TxId>
impl<Network, Fountain, VoteTally, TxId> Overlay<Network, Fountain, VoteTally, TxId> for Flat<TxId>
where
TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static,
Network: NetworkAdapter + Sync,
@ -49,8 +49,6 @@ where
VoteTally: Tally + Sync,
VoteTally::Vote: Serialize + DeserializeOwned + Send,
{
type TxId = TxId;
fn new(view: &View, node: NodeId) -> Self {
Flat::new(view.view_n, node)
}
@ -60,12 +58,12 @@ where
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block<Self::TxId>, FountainError> {
) -> Result<Block<TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await;
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<Self::TxId>>()
.deserialize::<Block<TxId>>()
.map_err(|e| FountainError::from(e.to_string().as_str()))
})
}
@ -73,7 +71,7 @@ where
async fn broadcast_block(
&self,
view: &View,
block: Block<Self::TxId>,
block: Block<TxId>,
adapter: &Network,
fountain: &Fountain,
) {
@ -93,7 +91,7 @@ where
async fn approve_and_forward(
&self,
view: &View,
block: &Block<Self::TxId>,
block: &Block<TxId>,
adapter: &Network,
_tally: &VoteTally,
_next_view: &View,

View File

@ -3,6 +3,7 @@ mod flat;
// std
use std::error::Error;
use std::hash::Hash;
// crates
// internal
use super::{Approval, NodeId, View};
@ -14,9 +15,13 @@ use nomos_core::vote::Tally;
/// Dissemination overlay, tied to a specific view
#[async_trait::async_trait]
pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Tally> {
type TxId: serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash + Send + Sync + 'static;
pub trait Overlay<
Network: NetworkAdapter,
Fountain: FountainCode,
VoteTally: Tally,
TxId: Clone + Eq + Hash,
>
{
fn new(view: &View, node: NodeId) -> Self;
async fn reconstruct_proposal_block(
@ -24,11 +29,11 @@ pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Ta
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block<Self::TxId>, FountainError>;
) -> Result<Block<TxId>, FountainError>;
async fn broadcast_block(
&self,
view: &View,
block: Block<Self::TxId>,
block: Block<TxId>,
adapter: &Network,
fountain: &Fountain,
);
@ -38,7 +43,7 @@ pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Ta
async fn approve_and_forward(
&self,
view: &View,
block: &Block<Self::TxId>,
block: &Block<TxId>,
adapter: &Network,
vote_tally: &VoteTally,
next_view: &View,

View File

@ -7,19 +7,20 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH};
// internal
use crate::backend::{MemPool, MempoolError};
use nomos_core::block::{BlockHeader, BlockId};
use nomos_core::tx::Transaction;
/// A mock mempool implementation that stores all transactions in memory in the order received.
pub struct MockPool<Id, Tx> {
pending_txs: LinkedHashMap<Id, Tx>,
pub struct MockPool<Tx: Transaction>
where
Tx::Hash: Hash,
{
pending_txs: LinkedHashMap<Tx::Hash, Tx>,
in_block_txs: BTreeMap<BlockId, Vec<Tx>>,
in_block_txs_by_id: BTreeMap<Id, BlockId>,
in_block_txs_by_id: BTreeMap<Tx::Hash, BlockId>,
last_tx_timestamp: u64,
}
impl<Id, Tx> Default for MockPool<Id, Tx>
where
Id: Eq + Hash,
{
impl<Tx: Transaction> Default for MockPool<Tx> {
fn default() -> Self {
Self {
pending_txs: LinkedHashMap::new(),
@ -30,30 +31,29 @@ where
}
}
impl<Id, Tx> MockPool<Id, Tx>
impl<Tx: Transaction> MockPool<Tx>
where
Id: Eq + Hash,
Tx::Hash: Ord,
{
pub fn new() -> Self {
Default::default()
}
}
impl<Id, Tx> MemPool for MockPool<Id, Tx>
impl<Tx> MemPool for MockPool<Tx>
where
Id: for<'t> From<&'t Tx> + PartialOrd + Ord + Eq + Hash + Clone,
Tx: Clone + Send + Sync + 'static + Hash,
Tx: Transaction + Clone + Send + Sync + 'static + Hash,
Tx::Hash: Ord,
{
type Settings = ();
type Tx = Tx;
type Id = Id;
fn new(_settings: Self::Settings) -> Self {
Self::new()
}
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError> {
let id = Id::from(&tx);
let id = <Self::Tx as Transaction>::hash(&tx);
if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) {
return Err(MempoolError::ExistingTx);
}
@ -73,7 +73,7 @@ where
Box::new(pending_txs.into_iter())
}
fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader) {
fn mark_in_block(&mut self, txs: &[<Self::Tx as Transaction>::Hash], block: BlockHeader) {
let mut txs_in_block = Vec::with_capacity(txs.len());
for tx_id in txs.iter() {
if let Some(tx) = self.pending_txs.remove(tx_id) {
@ -96,7 +96,7 @@ where
})
}
fn prune(&mut self, txs: &[Self::Id]) {
fn prune(&mut self, txs: &[<Self::Tx as Transaction>::Hash]) {
for tx_id in txs {
self.pending_txs.remove(tx_id);
}

View File

@ -2,6 +2,7 @@
pub mod mockpool;
use nomos_core::block::{BlockHeader, BlockId};
use nomos_core::tx::Transaction;
#[derive(thiserror::Error, Debug)]
pub enum MempoolError {
@ -13,8 +14,7 @@ pub enum MempoolError {
pub trait MemPool {
type Settings: Clone;
type Tx;
type Id;
type Tx: Transaction;
/// Construct a new empty pool
fn new(settings: Self::Settings) -> Self;
@ -30,7 +30,7 @@ pub trait MemPool {
fn view(&self, ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Tx> + Send>;
/// Record that a set of transactions were included in a block
fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader);
fn mark_in_block(&mut self, txs: &[<Self::Tx as Transaction>::Hash], block: BlockHeader);
/// Returns all of the transactions for the block
#[cfg(test)]
@ -41,7 +41,7 @@ pub trait MemPool {
/// Signal that a set of transactions can't be possibly requested anymore and can be
/// discarded.
fn prune(&mut self, txs: &[Self::Id]);
fn prune(&mut self, txs: &[<Self::Tx as Transaction>::Hash]);
fn pending_tx_count(&self) -> usize;
fn last_tx_timestamp(&self) -> u64;

View File

@ -11,6 +11,7 @@ use tokio::sync::oneshot::Sender;
use crate::network::NetworkAdapter;
use backend::MemPool;
use nomos_core::block::{BlockHeader, BlockId};
use nomos_core::tx::Transaction;
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -25,7 +26,7 @@ where
P: MemPool,
P::Settings: Clone,
P::Tx: Debug + 'static,
P::Id: Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
@ -37,7 +38,7 @@ pub struct MempoolMetrics {
pub last_tx_timestamp: u64,
}
pub enum MempoolMsg<Tx, Id> {
pub enum MempoolMsg<Tx: Transaction> {
AddTx {
tx: Tx,
reply_channel: Sender<Result<(), ()>>,
@ -47,7 +48,7 @@ pub enum MempoolMsg<Tx, Id> {
reply_channel: Sender<Box<dyn Iterator<Item = Tx> + Send>>,
},
Prune {
ids: Vec<Id>,
ids: Vec<Tx::Hash>,
},
#[cfg(test)]
BlockTransaction {
@ -55,7 +56,7 @@ pub enum MempoolMsg<Tx, Id> {
reply_channel: Sender<Option<Box<dyn Iterator<Item = Tx> + Send>>>,
},
MarkInBlock {
ids: Vec<Id>,
ids: Vec<Tx::Hash>,
block: BlockHeader,
},
Metrics {
@ -63,7 +64,10 @@ pub enum MempoolMsg<Tx, Id> {
},
}
impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
impl<Tx: Transaction + Debug> Debug for MempoolMsg<Tx>
where
Tx::Hash: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> {
match self {
Self::View { ancestor_hint, .. } => {
@ -86,7 +90,7 @@ impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
}
}
impl<Tx: 'static, Id: 'static> RelayMessage for MempoolMsg<Tx, Id> {}
impl<Tx: Transaction + 'static> RelayMessage for MempoolMsg<Tx> {}
impl<N, P> ServiceData for MempoolService<N, P>
where
@ -94,13 +98,13 @@ where
P: MemPool,
P::Settings: Clone,
P::Tx: Debug + 'static,
P::Id: Debug + 'static,
<P::Tx as Transaction>::Hash: Debug,
{
const SERVICE_ID: ServiceId = "Mempool";
type Settings = P::Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as MemPool>::Tx, <P as MemPool>::Id>;
type Message = MempoolMsg<<P as MemPool>::Tx>;
}
#[async_trait::async_trait]
@ -108,8 +112,8 @@ impl<N, P> ServiceCore for MempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
P::Id: Debug + Send + 'static,
P::Tx: Clone + Debug + Send + Sync + 'static,
P::Tx: Transaction + Clone + Debug + Send + Sync + 'static,
<P::Tx as Transaction>::Hash: Debug + Send + Sync + 'static,
N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {

View File

@ -1,7 +1,4 @@
use nomos_core::{
block::BlockId,
tx::mock::{MockTransaction, MockTxId},
};
use nomos_core::{block::BlockId, tx::mock::MockTransaction};
use nomos_log::{Logger, LoggerSettings};
use nomos_network::{
backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage},
@ -20,7 +17,7 @@ use nomos_mempool::{
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTxId, MockTransaction>>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTransaction>>>,
}
#[test]
@ -70,7 +67,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter, MockPool<MockTxId, MockTransaction>>>();
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();