Detach tx broadcast from mempool (#69)

* Remove send tx method from mempool network adapter

* Add error reporting to add_tx operation in mempool
Delegate broadcasting to external caller
This commit is contained in:
Daniel Sanchez 2023-02-08 11:07:09 +01:00 committed by GitHub
parent bbb783e1da
commit 320755d19d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 78 additions and 65 deletions

View File

@ -9,6 +9,7 @@ edition = "2021"
blake2 = "0.10" blake2 = "0.10"
bincode = "2.0.0-rc.2" bincode = "2.0.0-rc.2"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
chrono = "0.4"
futures = "0.3" futures = "0.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
@ -25,3 +26,4 @@ serde_json = "1.0"
serde_yaml = "0.9" serde_yaml = "0.9"
color-eyre = "0.6.0" color-eyre = "0.6.0"
serde = "1" serde = "1"
waku-bindings = "0.1.0-beta3"

View File

@ -1,21 +1,24 @@
// std // std
// crates // crates
use tokio::sync::oneshot;
use tracing::debug;
// internal
use crate::tx::{Tx, TxId};
use futures::future::join_all; use futures::future::join_all;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use nomos_core::wire;
use nomos_http::backends::axum::AxumBackend; use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest}; use nomos_http::http::{HttpMethod, HttpRequest};
use nomos_mempool::backend::mockpool::MockPool; use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::waku::WakuAdapter; use nomos_mempool::network::adapters::waku::{
WakuAdapter, WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC,
};
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo}; use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo};
use nomos_network::{NetworkMsg, NetworkService}; use nomos_network::{NetworkMsg, NetworkService};
use tokio::sync::oneshot; use overwatch_rs::services::relay::OutboundRelay;
use tracing::debug; use waku_bindings::WakuMessage;
// internal
use crate::tx::{Tx, TxId};
pub fn mempool_metrics_bridge( pub fn mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle, handle: overwatch_rs::overwatch::handle::OverwatchHandle,
@ -73,11 +76,25 @@ pub fn mempool_add_tx_bridge(
.as_ref() .as_ref()
.and_then(|b| String::from_utf8(b.to_vec()).ok()) .and_then(|b| String::from_utf8(b.to_vec()).ok())
{ {
let tx = Tx(data);
let (sender, receiver) = oneshot::channel();
mempool_channel mempool_channel
.send(MempoolMsg::AddTx { tx: Tx(data) }) .send(MempoolMsg::AddTx {
tx: tx.clone(),
reply_channel: sender,
})
.await .await
.unwrap(); .unwrap();
res_tx.send(b"".to_vec().into()).await.unwrap(); if let Ok(()) = receiver.await.unwrap() {
// broadcast transaction to peers
let network_relay = handle
.relay::<NetworkService<Waku>>()
.connect()
.await
.unwrap();
send_transaction(network_relay, tx).await;
res_tx.send(b"".to_vec().into()).await.unwrap();
}
} else { } else {
debug!( debug!(
"Invalid payload, {:?}. Empty or couldn't transform into a utf8 String", "Invalid payload, {:?}. Empty or couldn't transform into a utf8 String",
@ -161,3 +178,21 @@ pub fn waku_add_conn_bridge(
Ok(()) Ok(())
})) }))
} }
async fn send_transaction(network_relay: OutboundRelay<NetworkMsg<Waku>>, tx: Tx) {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
if let Err((_, _e)) = network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message: WakuMessage::new(
payload,
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now().timestamp() as usize,
),
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}

View File

@ -45,7 +45,10 @@ impl<Tx, Id> Leadership<Tx, Id> {
let ancestor_hint = todo!("get the ancestor from the tip"); let ancestor_hint = todo!("get the ancestor from the tip");
if view.is_leader(self.key.key) { if view.is_leader(self.key.key) {
let (tx, rx) = tokio::sync::oneshot::channel(); let (tx, rx) = tokio::sync::oneshot::channel();
self.mempool.send(MempoolMsg::View { ancestor_hint, tx }); self.mempool.send(MempoolMsg::View {
ancestor_hint,
reply_channel: tx,
});
let _iter = rx.await; let _iter = rx.await;
LeadershipResult::Leader { LeadershipResult::Leader {

View File

@ -18,7 +18,7 @@ use nomos_network::{
use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto); WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
const APPLICATION_NAME: &str = "CarnotSim"; const APPLICATION_NAME: &str = "CarnotSim";

View File

@ -16,6 +16,7 @@ nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
rand = { version = "0.8", optional = true } rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
tracing = "0.1" tracing = "0.1"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1" tokio-stream = "0.1"

View File

@ -4,7 +4,7 @@ use std::collections::BTreeMap;
use std::hash::Hash; use std::hash::Hash;
// crates // crates
// internal // internal
use crate::backend::MemPool; use crate::backend::{MemPool, MempoolError};
use nomos_core::block::{BlockHeader, BlockId}; use nomos_core::block::{BlockHeader, BlockId};
/// A mock mempool implementation that stores all transactions in memory in the order received. /// A mock mempool implementation that stores all transactions in memory in the order received.
@ -49,10 +49,10 @@ where
Self::new() Self::new()
} }
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError> { fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError> {
let id = Id::from(&tx); let id = Id::from(&tx);
if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) { if self.pending_txs.contains_key(&id) || self.in_block_txs_by_id.contains_key(&id) {
return Ok(()); return Err(MempoolError::ExistingTx);
} }
self.pending_txs.insert(id, tx); self.pending_txs.insert(id, tx);
Ok(()) Ok(())

View File

@ -3,6 +3,14 @@ pub mod mockpool;
use nomos_core::block::{BlockHeader, BlockId}; use nomos_core::block::{BlockHeader, BlockId};
#[derive(thiserror::Error, Debug)]
pub enum MempoolError {
#[error("Tx already in mempool")]
ExistingTx,
#[error(transparent)]
DynamicPoolError(#[from] overwatch_rs::DynError),
}
pub trait MemPool { pub trait MemPool {
type Settings: Clone; type Settings: Clone;
type Tx; type Tx;
@ -12,7 +20,7 @@ pub trait MemPool {
fn new(settings: Self::Settings) -> Self; fn new(settings: Self::Settings) -> Self;
/// Add a new transaction to the mempool, for example because we received it from the network /// Add a new transaction to the mempool, for example because we received it from the network
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError>; fn add_tx(&mut self, tx: Self::Tx) -> Result<(), MempoolError>;
/// Return a view over the transactions contained in the mempool. /// Return a view over the transactions contained in the mempool.
/// Implementations should provide *at least* all the transactions which have not been marked as /// Implementations should provide *at least* all the transactions which have not been marked as

View File

@ -7,7 +7,6 @@ use std::fmt::{Debug, Error, Formatter};
// crates // crates
use futures::StreamExt; use futures::StreamExt;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
// internal // internal
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
use backend::MemPool; use backend::MemPool;
@ -40,10 +39,11 @@ pub struct MempoolMetrics {
pub enum MempoolMsg<Tx, Id> { pub enum MempoolMsg<Tx, Id> {
AddTx { AddTx {
tx: Tx, tx: Tx,
reply_channel: Sender<Result<(), ()>>,
}, },
View { View {
ancestor_hint: BlockId, ancestor_hint: BlockId,
tx: Sender<Box<dyn Iterator<Item = Tx> + Send>>, reply_channel: Sender<Box<dyn Iterator<Item = Tx> + Send>>,
}, },
Prune { Prune {
ids: Vec<Id>, ids: Vec<Id>,
@ -63,7 +63,7 @@ impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
Self::View { ancestor_hint, .. } => { Self::View { ancestor_hint, .. } => {
write!(f, "MempoolMsg::View {{ ancestor_hint: {ancestor_hint:?}}}") write!(f, "MempoolMsg::View {{ ancestor_hint: {ancestor_hint:?}}}")
} }
Self::AddTx { tx } => write!(f, "MempoolMsg::AddTx{{tx: {tx:?}}}"), Self::AddTx { tx, .. } => write!(f, "MempoolMsg::AddTx{{tx: {tx:?}}}"),
Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {ids:?}}}"), Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {ids:?}}}"),
Self::MarkInBlock { ids, block } => { Self::MarkInBlock { ids, block } => {
write!( write!(
@ -131,18 +131,20 @@ where
tokio::select! { tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => { Some(msg) = service_state.inbound_relay.recv() => {
match msg { match msg {
MempoolMsg::AddTx { tx } => { MempoolMsg::AddTx { tx, reply_channel } => {
match pool.add_tx(tx.clone()) { match pool.add_tx(tx.clone()) {
Ok(_id) => { Ok(_id) => {
adapter.send_transaction(tx).await; if let Err(e) = reply_channel.send(Ok(())) {
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
}
} }
Err(e) => { Err(e) => {
tracing::debug!("could not add tx to the pool due to: {}", e); tracing::debug!("could not add tx to the pool due to: {}", e);
} }
} }
} }
MempoolMsg::View { ancestor_hint, tx } => { MempoolMsg::View { ancestor_hint, reply_channel } => {
tx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { reply_channel.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {
tracing::debug!("could not send back pool view") tracing::debug!("could not send back pool view")
}); });
} }

View File

@ -4,7 +4,7 @@ use std::marker::PhantomData;
// crates // crates
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use nomos_network::backends::mock::{ use nomos_network::backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent, EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent,
}; };
use nomos_network::{NetworkMsg, NetworkService}; use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
@ -95,22 +95,4 @@ where
}, },
))) )))
} }
async fn send_transaction(&self, tx: Self::Tx) {
if let Err((e, _e)) = self
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: MockMessage::new(
tx.into(),
MOCK_TX_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
),
topic: MOCK_PUB_SUB_TOPIC,
}))
.await
{
tracing::error!(err = ?e);
};
}
} }

View File

@ -13,12 +13,12 @@ use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
use serde::Serialize; use serde::Serialize;
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic}; use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic};
const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = pub const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto); WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic = pub const WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto); WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto);
pub struct WakuAdapter<Tx> { pub struct WakuAdapter<Tx> {
@ -83,23 +83,4 @@ where
}, },
))) )))
} }
async fn send_transaction(&self, tx: Self::Tx) {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message: WakuMessage::new(
payload,
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now().timestamp() as usize,
),
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}
} }

View File

@ -19,5 +19,4 @@ pub trait NetworkAdapter {
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self; ) -> Self;
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send>; async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send>;
async fn send_transaction(&self, tx: Self::Tx);
} }

View File

@ -96,7 +96,7 @@ fn test_mockmempool() {
mempool_outbound mempool_outbound
.send(MempoolMsg::View { .send(MempoolMsg::View {
ancestor_hint: BlockId, ancestor_hint: BlockId,
tx: mtx, reply_channel: mtx,
}) })
.await .await
.unwrap(); .unwrap();