Add generic block (#93)

* add generic block

* fix PR comments

* Block uses tx hashes

* Refactor bounds and generics to accept block type

* remove Tx generics

* add generic for block

* Remove unnecessary bounds on leadership

* Impl from with ownership for mock tx and txid

* feature gate

---------

Co-authored-by: danielsanchezq <sanchez.quiros.daniel@gmail.com>
This commit is contained in:
Al Liu 2023-03-15 00:55:08 +08:00 committed by GitHub
parent 6ce9fdf553
commit 818d7f29cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 313 additions and 146 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -18,6 +18,7 @@ serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
bincode = "1.3"
once_cell = "1.0"
indexmap = { version = "1.9", features = ["serde-1"] }
blake2 = { version = "0.10", optional = true }
serde_json = { version = "1", optional = true }

View File

@ -1,36 +1,53 @@
use indexmap::IndexSet;
// std
use core::hash::Hash;
// crates
use bytes::Bytes;
use serde::{Deserialize, Serialize};
// internal
pub type TxHash = [u8; 32];
/// A block
#[derive(Clone, Debug)]
pub struct Block;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Block<TxId: Eq + Hash> {
header: BlockHeader,
transactions: IndexSet<TxId>,
}
/// A block header
#[derive(Clone, Debug)]
pub struct BlockHeader;
#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
pub struct BlockHeader {
id: BlockId,
}
/// Identifier of a block
pub type BlockId = [u8; 32];
impl Block {
impl<TxId: Eq + Hash> Block<TxId> {
pub fn new(header: BlockHeader, txs: impl Iterator<Item = TxId>) -> Self {
Self {
header,
transactions: txs.collect(),
}
}
/// Encode block into bytes
pub fn as_bytes(&self) -> Bytes {
Bytes::new()
}
pub fn from_bytes(_: Bytes) -> Self {
Self
pub fn header(&self) -> BlockHeader {
self.header
}
pub fn header(&self) -> BlockHeader {
BlockHeader
pub fn transactions(&self) -> impl Iterator<Item = &TxId> + '_ {
self.transactions.iter()
}
}
impl BlockHeader {
pub fn id(&self) -> BlockId {
todo!()
self.id
}
}

View File

@ -0,0 +1,10 @@
// std
// crates
use serde::{Deserialize, Serialize};
// internal
pub use crate::tx::transaction::Transaction;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Tx {
Transfer(Transaction),
}

View File

@ -1,23 +1,77 @@
use crate::wire::serialize;
use blake2::{
digest::{Update, VariableOutput},
Blake2bVar,
};
use nomos_network::backends::mock::MockMessage;
#[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize)]
pub enum MockTransactionMsg {
Request(nomos_network::backends::mock::MockMessage),
Response(nomos_network::backends::mock::MockMessage),
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
pub struct MockTransaction {
id: MockTxId,
content: MockMessage,
}
#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)]
impl MockTransaction {
pub fn new(content: MockMessage) -> Self {
let id = MockTxId::from(content.clone());
Self { id, content }
}
pub fn message(&self) -> &MockMessage {
&self.content
}
}
impl From<nomos_network::backends::mock::MockMessage> for MockTransaction {
fn from(msg: nomos_network::backends::mock::MockMessage) -> Self {
let id = MockTxId::from(msg.clone());
Self { id, content: msg }
}
}
#[derive(
Debug, Eq, Hash, PartialEq, Ord, Copy, Clone, PartialOrd, serde::Serialize, serde::Deserialize,
)]
pub struct MockTxId([u8; 32]);
impl From<&MockTransactionMsg> for MockTxId {
fn from(tx: &MockTransactionMsg) -> Self {
impl From<[u8; 32]> for MockTxId {
fn from(tx_id: [u8; 32]) -> Self {
Self(tx_id)
}
}
impl core::ops::Deref for MockTxId {
type Target = [u8; 32];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<[u8]> for MockTxId {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl MockTxId {
pub fn new(tx_id: [u8; 32]) -> MockTxId {
MockTxId(tx_id)
}
}
impl From<nomos_network::backends::mock::MockMessage> for MockTxId {
fn from(msg: nomos_network::backends::mock::MockMessage) -> Self {
let mut hasher = Blake2bVar::new(32).unwrap();
hasher.update(serde_json::to_string(tx).unwrap().as_bytes());
hasher.update(&serialize(&msg).unwrap());
let mut id = [0u8; 32];
hasher.finalize_variable(&mut id).unwrap();
Self(id)
}
}
impl From<&MockTransaction> for MockTxId {
fn from(msg: &MockTransaction) -> Self {
msg.id
}
}

View File

@ -1,11 +1,4 @@
pub mod carnot;
#[cfg(feature = "mock")]
pub mod mock;
mod transaction;
use serde::{Deserialize, Serialize};
pub use transaction::Transaction;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Tx {
Transfer(Transaction),
}

View File

@ -16,7 +16,7 @@ nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
rand_chacha = "0.3"
rand = "0.8"
serde = "1.0"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-rc.2", optional = true}

View File

@ -2,7 +2,7 @@
use std::marker::PhantomData;
// crates
// internal
use nomos_core::crypto::PrivateKey;
use nomos_core::{block::BlockHeader, crypto::PrivateKey};
use nomos_mempool::MempoolMsg;
use super::*;
@ -17,9 +17,9 @@ pub struct Leadership<Tx, Id> {
mempool: OutboundRelay<MempoolMsg<Tx, Id>>,
}
pub enum LeadershipResult<'view> {
pub enum LeadershipResult<'view, TxId: Eq + core::hash::Hash> {
Leader {
block: Block,
block: Block<TxId>,
_view: PhantomData<&'view u8>,
},
NotLeader {
@ -27,7 +27,11 @@ pub enum LeadershipResult<'view> {
},
}
impl<Tx, Id> Leadership<Tx, Id> {
impl<Tx, Id> Leadership<Tx, Id>
where
Id: Eq + core::hash::Hash,
for<'t> &'t Tx: Into<Id>, // TODO: we should probably abstract this away but for now the constrain may do
{
pub fn new(key: PrivateKey, mempool: OutboundRelay<MempoolMsg<Tx, Id>>) -> Self {
Self {
key: Enclave { key },
@ -41,19 +45,21 @@ impl<Tx, Id> Leadership<Tx, Id> {
view: &'view View,
tip: &Tip,
qc: Qc,
) -> LeadershipResult<'view> {
let ancestor_hint = todo!("get the ancestor from the tip");
) -> LeadershipResult<'view, Id> {
// TODO: get the correct ancestor for the tip
// let ancestor_hint = todo!("get the ancestor from the tip");
let ancestor_hint = [0; 32];
if view.is_leader(self.key.key) {
let (tx, rx) = tokio::sync::oneshot::channel();
self.mempool.send(MempoolMsg::View {
ancestor_hint,
reply_channel: tx,
});
let _iter = rx.await;
let iter = rx.await.unwrap();
LeadershipResult::Leader {
_view: PhantomData,
block: todo!("form a block from the returned iterator"),
block: Block::new(BlockHeader::default(), iter.map(|ref tx| tx.into())),
}
} else {
LeadershipResult::NotLeader { _view: PhantomData }

View File

@ -7,20 +7,18 @@
mod leadership;
mod network;
pub mod overlay;
#[cfg(test)]
mod test;
mod tip;
// std
use std::collections::BTreeMap;
use std::error::Error;
use std::fmt::Debug;
use std::hash::Hash;
// crates
use serde::{Deserialize, Serialize};
// internal
use crate::network::NetworkAdapter;
use leadership::{Leadership, LeadershipResult};
use nomos_core::block::Block;
use nomos_core::block::{Block, TxHash};
use nomos_core::crypto::PublicKey;
use nomos_core::fountain::FountainCode;
use nomos_core::staking::Stake;
@ -42,6 +40,7 @@ pub type NodeId = PublicKey;
// Random seed for each round provided by the protocol
pub type Seed = [u8; 32];
#[derive(Debug)]
pub struct CarnotSettings<Fountain: FountainCode, VoteTally: Tally> {
private_key: [u8; 32],
fountain_settings: Fountain::Settings,
@ -123,10 +122,19 @@ where
T::Settings: Send + Sync + 'static,
T::Outcome: Send + Sync,
P::Settings: Send + Sync + 'static,
P::Tx: Debug + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static,
P::Tx: Debug + Clone + serde::de::DeserializeOwned + Send + Sync + 'static,
for<'t> &'t P::Tx: Into<TxHash>,
P::Id: Debug
+ Clone
+ serde::de::DeserializeOwned
+ for<'a> From<&'a P::Tx>
+ Eq
+ Hash
+ Send
+ Sync
+ 'static,
M: MempoolAdapter<Tx = P::Tx> + Send + Sync + 'static,
O: Overlay<A, F, T> + Send + Sync + 'static,
O: Overlay<A, F, T, TxId = P::Id> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -167,7 +175,7 @@ where
let fountain = F::new(fountain_settings);
let tally = T::new(tally_settings);
let leadership = Leadership::new(private_key, mempool_relay);
let leadership = Leadership::<P::Tx, P::Id>::new(private_key, mempool_relay.clone());
// FIXME: this should be taken from config
let mut cur_view = View {
seed: [0; 32],
@ -180,7 +188,7 @@ where
// FIXME: this should probably have a timer to detect failed rounds
let res = cur_view
.resolve::<A, O, _, _, _, _>(
.resolve::<A, O, _, _, _>(
private_key,
&tip,
&network_adapter,
@ -190,10 +198,22 @@ where
)
.await;
match res {
Ok((_block, view)) => {
Ok((block, view)) => {
// resolved block, mark as verified and possibly update the tip
// not sure what mark as verified means, e.g. if we want an event subscription
// system for this to be used for example by the ledger, storage and mempool
mempool_relay
.send(nomos_mempool::MempoolMsg::MarkInBlock {
ids: block.transactions().cloned().collect(),
block: block.header(),
})
.await
.map_err(|(e, _)| {
tracing::error!("Error while sending MarkInBlock message: {}", e);
e
})?;
cur_view = view;
}
Err(e) => {
@ -217,27 +237,26 @@ pub struct View {
impl View {
// TODO: might want to encode steps in the type system
pub async fn resolve<'view, A, O, F, T, Tx, Id>(
pub async fn resolve<'view, A, O, F, T, Tx>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx, Id>,
) -> Result<(Block, View), Box<dyn Error>>
leadership: &Leadership<Tx, O::TxId>,
) -> Result<(Block<O::TxId>, View), Box<dyn std::error::Error + Send + Sync + 'static>>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
for<'t> &'t Tx: Into<O::TxId>,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
O: Overlay<A, F, T>,
{
let res = if self.is_leader(node_id) {
let block = self
.resolve_leader::<A, O, F, T, _, _>(
node_id, tip, adapter, fountain, tally, leadership,
)
.resolve_leader::<A, O, F, T, _>(node_id, tip, adapter, fountain, tally, leadership)
.await
.unwrap(); // FIXME: handle sad path
let next_view = self.generate_next_view(&block);
@ -257,20 +276,21 @@ impl View {
Ok(res)
}
async fn resolve_leader<'view, A, O, F, T, Tx, Id>(
async fn resolve_leader<'view, A, O, F, T, Tx>(
&'view self,
node_id: NodeId,
tip: &Tip,
adapter: &A,
fountain: &F,
tally: &T,
leadership: &Leadership<Tx, Id>,
) -> Result<Block, ()>
leadership: &Leadership<Tx, O::TxId>,
) -> Result<Block<O::TxId>, ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
T: Tally + Send + Sync + 'static,
T::Outcome: Send + Sync,
for<'t> &'t Tx: Into<O::TxId>,
O: Overlay<A, F, T>,
{
let overlay = O::new(self, node_id);
@ -295,7 +315,7 @@ impl View {
adapter: &A,
fountain: &F,
tally: &T,
) -> Result<(Block, View), ()>
) -> Result<(Block<O::TxId>, View), ()>
where
A: NetworkAdapter + Send + Sync + 'static,
F: FountainCode,
@ -332,7 +352,7 @@ impl View {
}
pub fn is_leader(&self, _node_id: NodeId) -> bool {
false
true
}
pub fn id(&self) -> u64 {
@ -340,12 +360,12 @@ impl View {
}
// Verifies the block is new and the previous leader did not fail
fn pipelined_safe_block(&self, _: &Block) -> bool {
fn pipelined_safe_block<TxId: Eq + Hash>(&self, _: &Block<TxId>) -> bool {
// return b.view_n >= self.view_n && b.view_n == b.qc.view_n
true
}
fn generate_next_view(&self, _b: &Block) -> View {
fn generate_next_view<TxId: Eq + Hash>(&self, _b: &Block<TxId>) -> View {
let mut seed = self.seed;
seed[0] += 1;
View {

View File

@ -108,7 +108,7 @@ impl NetworkAdapter for MockAdapter {
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: message,
topic: MOCK_PUB_SUB_TOPIC,
topic: MOCK_PUB_SUB_TOPIC.to_string(),
}))
.await
{
@ -164,7 +164,7 @@ impl NetworkAdapter for MockAdapter {
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: message,
topic: MOCK_PUB_SUB_TOPIC,
topic: MOCK_PUB_SUB_TOPIC.to_string(),
}))
.await
{

View File

@ -1,6 +1,8 @@
// std
use std::hash::Hash;
// crates
use futures::StreamExt;
use nomos_core::wire::deserializer;
use rand::{seq::SliceRandom, SeedableRng};
// internal
use super::*;
@ -8,38 +10,44 @@ use crate::network::messages::ProposalChunkMsg;
use crate::network::NetworkAdapter;
/// View of the tree overlay centered around a specific member
pub struct Member<const C: usize> {
pub struct Member<TxId: Eq + Hash, const C: usize> {
// id is not used now, but gonna probably used it for later checking later on
#[allow(dead_code)]
id: NodeId,
committee: Committee,
committees: Committees<C>,
committees: Committees<TxId, C>,
view_n: u64,
_marker: std::marker::PhantomData<TxId>,
}
/// #Just a newtype index to be able to implement parent/children methods
#[derive(Copy, Clone)]
pub struct Committee(usize);
pub struct Committees<const C: usize> {
pub struct Committees<TxId: Eq + Hash, const C: usize> {
nodes: Box<[NodeId]>,
_marker: std::marker::PhantomData<TxId>,
}
impl<const C: usize> Committees<C> {
impl<TxId: Eq + Hash, const C: usize> Committees<TxId, C> {
pub fn new(view: &View) -> Self {
let mut nodes = view.staking_keys.keys().cloned().collect::<Box<[NodeId]>>();
let mut rng = rand_chacha::ChaCha20Rng::from_seed(view.seed);
nodes.shuffle(&mut rng);
Self { nodes }
Self {
nodes,
_marker: std::marker::PhantomData,
}
}
pub fn into_member(self, id: NodeId, view: &View) -> Option<Member<C>> {
pub fn into_member(self, id: NodeId, view: &View) -> Option<Member<TxId, C>> {
let member_idx = self.nodes.iter().position(|m| m == &id)?;
Some(Member {
committee: Committee(member_idx / C),
committees: self,
id,
view_n: view.view_n,
_marker: std::marker::PhantomData,
})
}
@ -84,7 +92,7 @@ impl Committee {
}
}
impl<const C: usize> Member<C> {
impl<TxId: Eq + Hash, const C: usize> Member<TxId, C> {
/// Return other members of this committee
pub fn peers(&self) -> &[NodeId] {
self.committees
@ -109,13 +117,15 @@ impl<const C: usize> Member<C> {
}
#[async_trait::async_trait]
impl<
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
const C: usize,
> Overlay<Network, Fountain, VoteTally> for Member<C>
impl<Network, Fountain, VoteTally, TxId, const C: usize> Overlay<Network, Fountain, VoteTally>
for Member<TxId, C>
where
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
TxId: serde::de::DeserializeOwned + Eq + Hash + Clone + Send + Sync + 'static,
{
type TxId = TxId;
// we still need view here to help us initialize
fn new(view: &View, node: NodeId) -> Self {
let committees = Committees::new(view);
@ -127,17 +137,21 @@ impl<
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block, FountainError> {
) -> Result<Block<Self::TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let committee = self.committee;
let message_stream = adapter.proposal_chunks_stream(committee, view).await;
fountain.decode(message_stream).await.map(Block::from_bytes)
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<Self::TxId>>()
.map_err(|e| FountainError::from(e.to_string().as_str()))
})
}
async fn broadcast_block(
&self,
view: &View,
block: Block,
block: Block<Self::TxId>,
adapter: &Network,
fountain: &Fountain,
) {
@ -166,7 +180,7 @@ impl<
async fn approve_and_forward(
&self,
view: &View,
_block: &Block,
_block: &Block<Self::TxId>,
_adapter: &Network,
_tally: &VoteTally,
_next_view: &View,

View File

@ -1,5 +1,6 @@
// std
use std::error::Error;
use std::hash::Hash;
// crates
use futures::StreamExt;
use serde::de::DeserializeOwned;
@ -9,6 +10,7 @@ use super::*;
use crate::network::messages::{ProposalChunkMsg, VoteMsg};
use crate::network::NetworkAdapter;
use crate::overlay::committees::Committee;
use nomos_core::wire::deserializer;
const FLAT_COMMITTEE: Committee = Committee::root();
@ -16,31 +18,39 @@ const FLAT_COMMITTEE: Committee = Committee::root();
/// As far as the API is concerned, this should be equivalent to any other
/// overlay and far simpler to implement.
/// For this reason, this might act as a 'reference' overlay for testing.
pub struct Flat {
pub struct Flat<TxId> {
// TODO: this should be a const param, but we can't do that yet
node_id: NodeId,
view_n: u64,
_marker: std::marker::PhantomData<TxId>,
}
impl Flat {
impl<TxId: Eq + Hash> Flat<TxId> {
pub fn new(view_n: u64, node_id: NodeId) -> Self {
Self { node_id, view_n }
Self {
node_id,
view_n,
_marker: Default::default(),
}
}
fn approve(&self, _block: &Block) -> Approval {
fn approve(&self, _block: &Block<TxId>) -> Approval {
// we still need to define how votes look like
todo!()
Approval
}
}
#[async_trait::async_trait]
impl<Network, Fountain, VoteTally> Overlay<Network, Fountain, VoteTally> for Flat
impl<Network, Fountain, VoteTally, TxId> Overlay<Network, Fountain, VoteTally> for Flat<TxId>
where
TxId: serde::de::DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static,
Network: NetworkAdapter + Sync,
Fountain: FountainCode + Sync,
VoteTally: Tally + Sync,
VoteTally::Vote: Serialize + DeserializeOwned + Send,
{
type TxId = TxId;
fn new(view: &View, node: NodeId) -> Self {
Flat::new(view.view_n, node)
}
@ -50,16 +60,20 @@ where
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block, FountainError> {
) -> Result<Block<Self::TxId>, FountainError> {
assert_eq!(view.view_n, self.view_n, "view_n mismatch");
let message_stream = adapter.proposal_chunks_stream(FLAT_COMMITTEE, view).await;
fountain.decode(message_stream).await.map(Block::from_bytes)
fountain.decode(message_stream).await.and_then(|b| {
deserializer(&b)
.deserialize::<Block<Self::TxId>>()
.map_err(|e| FountainError::from(e.to_string().as_str()))
})
}
async fn broadcast_block(
&self,
view: &View,
block: Block,
block: Block<Self::TxId>,
adapter: &Network,
fountain: &Fountain,
) {
@ -79,7 +93,7 @@ where
async fn approve_and_forward(
&self,
view: &View,
block: &Block,
block: &Block<Self::TxId>,
adapter: &Network,
_tally: &VoteTally,
_next_view: &View,

View File

@ -15,6 +15,8 @@ use nomos_core::vote::Tally;
/// Dissemination overlay, tied to a specific view
#[async_trait::async_trait]
pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Tally> {
type TxId: serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash + Send + Sync + 'static;
fn new(view: &View, node: NodeId) -> Self;
async fn reconstruct_proposal_block(
@ -22,11 +24,11 @@ pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Ta
view: &View,
adapter: &Network,
fountain: &Fountain,
) -> Result<Block, FountainError>;
) -> Result<Block<Self::TxId>, FountainError>;
async fn broadcast_block(
&self,
view: &View,
block: Block,
block: Block<Self::TxId>,
adapter: &Network,
fountain: &Fountain,
);
@ -36,7 +38,7 @@ pub trait Overlay<Network: NetworkAdapter, Fountain: FountainCode, VoteTally: Ta
async fn approve_and_forward(
&self,
view: &View,
block: &Block,
block: &Block<Self::TxId>,
adapter: &Network,
vote_tally: &VoteTally,
next_view: &View,

View File

@ -18,7 +18,7 @@ rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-rc.2", optional = true}

View File

@ -86,6 +86,16 @@ where
block_entry.append(&mut txs_in_block);
}
#[cfg(test)]
fn block_transactions(
&self,
block: BlockId,
) -> Option<Box<dyn Iterator<Item = Self::Tx> + Send>> {
self.in_block_txs.get(&block).map(|txs| {
Box::new(txs.clone().into_iter()) as Box<dyn Iterator<Item = Self::Tx> + Send>
})
}
fn prune(&mut self, txs: &[Self::Id]) {
for tx_id in txs {
self.pending_txs.remove(tx_id);

View File

@ -32,6 +32,13 @@ pub trait MemPool {
/// Record that a set of transactions were included in a block
fn mark_in_block(&mut self, txs: &[Self::Id], block: BlockHeader);
/// Returns all of the transactions for the block
#[cfg(test)]
fn block_transactions(
&self,
block: BlockId,
) -> Option<Box<dyn Iterator<Item = Self::Tx> + Send>>;
/// Signal that a set of transactions can't be possibly requested anymore and can be
/// discarded.
fn prune(&mut self, txs: &[Self::Id]);

View File

@ -49,6 +49,11 @@ pub enum MempoolMsg<Tx, Id> {
Prune {
ids: Vec<Id>,
},
#[cfg(test)]
BlockTransaction {
block: BlockId,
reply_channel: Sender<Option<Box<dyn Iterator<Item = Tx> + Send>>>,
},
MarkInBlock {
ids: Vec<Id>,
block: BlockHeader,
@ -72,6 +77,10 @@ impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
"MempoolMsg::MarkInBlock{{ids: {ids:?}, block: {block:?}}}"
)
}
#[cfg(test)]
Self::BlockTransaction { block, .. } => {
write!(f, "MempoolMsg::BlockTransaction{{block: {block:?}}}")
}
Self::Metrics { .. } => write!(f, "MempoolMsg::Metrics"),
}
}
@ -152,6 +161,12 @@ where
MempoolMsg::MarkInBlock { ids, block } => {
pool.mark_in_block(&ids, block);
}
#[cfg(test)]
MempoolMsg::BlockTransaction { block, reply_channel } => {
reply_channel.send(pool.block_transactions(block)).unwrap_or_else(|_| {
tracing::debug!("could not send back block transactions")
});
}
MempoolMsg::Prune { ids } => { pool.prune(&ids); },
MempoolMsg::Metrics { reply_channel } => {
let metrics = MempoolMetrics {

View File

@ -2,7 +2,7 @@
// crates
use futures::{Stream, StreamExt};
use nomos_core::tx::mock::MockTransactionMsg;
use nomos_core::tx::mock::MockTransaction;
use nomos_network::backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent,
};
@ -25,7 +25,7 @@ pub struct MockAdapter {
#[async_trait::async_trait]
impl NetworkAdapter for MockAdapter {
type Backend = Mock;
type Tx = MockTransactionMsg;
type Tx = MockTransaction;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
@ -48,7 +48,7 @@ impl NetworkAdapter for MockAdapter {
if let Err((e, _)) = network_relay
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: MOCK_PUB_SUB_TOPIC,
topic: MOCK_PUB_SUB_TOPIC.to_string(),
}))
.await
{
@ -76,10 +76,10 @@ impl NetworkAdapter for MockAdapter {
match event {
Ok(NetworkEvent::RawMessage(message)) => {
tracing::info!("Received message: {:?}", message.payload());
if message.content_topic() == MOCK_TX_CONTENT_TOPIC {
Some(MockTransactionMsg::Request(message))
if message.content_topic().eq(&MOCK_TX_CONTENT_TOPIC) {
Some(MockTransaction::new(message))
} else {
Some(MockTransactionMsg::Response(message))
None
}
}
Err(_e) => None,

View File

@ -1,6 +1,6 @@
use nomos_core::{
block::BlockId,
tx::mock::{MockTransactionMsg, MockTxId},
tx::mock::{MockTransaction, MockTxId},
};
use nomos_log::{Logger, LoggerSettings};
use nomos_network::{
@ -20,7 +20,7 @@ use nomos_mempool::{
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTxId, MockTransactionMsg>>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTxId, MockTransaction>>>,
}
#[test]
@ -70,7 +70,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter, MockPool<MockTxId, MockTransactionMsg>>>();
.relay::<MempoolService<MockAdapter, MockPool<MockTxId, MockTransaction>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();
@ -79,7 +79,7 @@ fn test_mockmempool() {
// subscribe to the mock content topic
network_outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: MOCK_TX_CONTENT_TOPIC.content_topic_name,
topic: MOCK_TX_CONTENT_TOPIC.content_topic_name.to_string(),
}))
.await
.unwrap();
@ -99,14 +99,7 @@ fn test_mockmempool() {
let items = mrx
.await
.unwrap()
.into_iter()
.filter_map(|tx| {
if let MockTransactionMsg::Request(msg) = tx {
Some(msg)
} else {
None
}
})
.map(|msg| msg.message().clone())
.collect::<std::collections::HashSet<_>>();
if items.len() == exp_txns.len() {

View File

@ -11,6 +11,7 @@ use rand::{
};
use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
@ -21,11 +22,11 @@ const BROADCAST_CHANNEL_BUF: usize = 16;
pub type MockMessageVersion = usize;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct MockContentTopic {
pub application_name: &'static str,
pub application_name: Cow<'static, str>,
pub version: usize,
pub content_topic_name: &'static str,
pub content_topic_name: Cow<'static, str>,
}
impl MockContentTopic {
@ -35,25 +36,27 @@ impl MockContentTopic {
content_topic_name: &'static str,
) -> Self {
Self {
application_name,
application_name: Cow::Borrowed(application_name),
version,
content_topic_name,
content_topic_name: Cow::Borrowed(content_topic_name),
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MockPubSubTopic {
pub topic_name: &'static str,
pub topic_name: Cow<'static, str>,
}
impl MockPubSubTopic {
pub const fn new(topic_name: &'static str) -> Self {
Self { topic_name }
Self {
topic_name: Cow::Borrowed(topic_name),
}
}
}
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Debug)]
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MockMessage {
pub payload: String,
@ -81,8 +84,8 @@ impl MockMessage {
}
}
pub const fn content_topic(&self) -> MockContentTopic {
self.content_topic
pub const fn content_topic(&self) -> &MockContentTopic {
&self.content_topic
}
pub fn payload(&self) -> String {
@ -92,9 +95,9 @@ impl MockMessage {
#[derive(Clone)]
pub struct Mock {
messages: Arc<Mutex<HashMap<&'static str, Vec<MockMessage>>>>,
messages: Arc<Mutex<HashMap<String, Vec<MockMessage>>>>,
message_event: Sender<NetworkEvent>,
subscribed_topics: Arc<Mutex<HashSet<&'static str>>>,
subscribed_topics: Arc<Mutex<HashSet<String>>>,
config: MockConfig,
}
@ -120,17 +123,17 @@ pub enum MockBackendMessage {
>,
},
Broadcast {
topic: &'static str,
topic: String,
msg: MockMessage,
},
RelaySubscribe {
topic: &'static str,
topic: String,
},
RelayUnSubscribe {
topic: &'static str,
topic: String,
},
Query {
topic: &'static str,
topic: String,
tx: oneshot::Sender<Vec<MockMessage>>,
},
}
@ -237,7 +240,7 @@ impl NetworkBackend for Mock {
config
.predefined_messages
.iter()
.map(|p| (p.content_topic.content_topic_name, Vec::new()))
.map(|p| (p.content_topic.content_topic_name.to_string(), Vec::new()))
.collect(),
)),
message_event,
@ -273,7 +276,7 @@ impl NetworkBackend for Mock {
}
MockBackendMessage::RelayUnSubscribe { topic } => {
tracing::info!("processed relay unsubscription for topic: {topic}");
self.subscribed_topics.lock().unwrap().remove(topic);
self.subscribed_topics.lock().unwrap().remove(&topic);
}
MockBackendMessage::Query { topic, tx } => {
tracing::info!("processed query");
@ -305,9 +308,9 @@ mod tests {
MockMessage {
payload: "foo".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
application_name: "mock network".into(),
version: 0,
content_topic_name: "foo",
content_topic_name: "foo".into(),
},
version: 0,
timestamp: 0,
@ -315,9 +318,9 @@ mod tests {
MockMessage {
payload: "bar".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
application_name: "mock network".into(),
version: 0,
content_topic_name: "bar",
content_topic_name: "bar".into(),
},
version: 0,
timestamp: 0,
@ -342,13 +345,13 @@ mod tests {
// broadcast
for val in FOO_BROADCAST_MESSAGES {
mock.process(MockBackendMessage::Broadcast {
topic: "foo",
topic: "foo".to_string(),
msg: MockMessage {
payload: val.to_string(),
content_topic: MockContentTopic {
application_name: "mock",
application_name: "mock".into(),
version: 1,
content_topic_name: "foo content",
content_topic_name: "foo content".into(),
},
version: 1,
timestamp: chrono::Utc::now().timestamp() as usize,
@ -359,13 +362,13 @@ mod tests {
for val in BAR_BROADCAST_MESSAGES {
mock.process(MockBackendMessage::Broadcast {
topic: "bar",
topic: "bar".to_string(),
msg: MockMessage {
payload: val.to_string(),
content_topic: MockContentTopic {
application_name: "mock",
application_name: "mock".into(),
version: 1,
content_topic_name: "bar content",
content_topic_name: "bar content".into(),
},
version: 1,
timestamp: chrono::Utc::now().timestamp() as usize,
@ -377,7 +380,7 @@ mod tests {
// query
let (qtx, qrx) = oneshot::channel();
mock.process(MockBackendMessage::Query {
topic: "foo",
topic: "foo".to_string(),
tx: qtx,
})
.await;
@ -388,20 +391,28 @@ mod tests {
}
// subscribe
mock.process(MockBackendMessage::RelaySubscribe { topic: "foo" })
.await;
mock.process(MockBackendMessage::RelaySubscribe { topic: "bar" })
.await;
mock.process(MockBackendMessage::RelaySubscribe {
topic: "foo".to_string(),
})
.await;
mock.process(MockBackendMessage::RelaySubscribe {
topic: "bar".to_string(),
})
.await;
assert!(mock.subscribed_topics.lock().unwrap().contains("foo"));
assert!(mock.subscribed_topics.lock().unwrap().contains("bar"));
// unsubscribe
mock.process(MockBackendMessage::RelayUnSubscribe { topic: "foo" })
.await;
mock.process(MockBackendMessage::RelayUnSubscribe {
topic: "foo".to_string(),
})
.await;
assert!(!mock.subscribed_topics.lock().unwrap().contains("foo"));
assert!(mock.subscribed_topics.lock().unwrap().contains("bar"));
mock.process(MockBackendMessage::RelayUnSubscribe { topic: "bar" })
.await;
mock.process(MockBackendMessage::RelayUnSubscribe {
topic: "bar".to_string(),
})
.await;
assert!(!mock.subscribed_topics.lock().unwrap().contains("foo"));
assert!(!mock.subscribed_topics.lock().unwrap().contains("bar"));
}