Mempool network adapter (#30)

* Refactor mempool naming and added settings to backend trait

* Implement mempool networking traits and waku backend

* Transaction networking plumbing in mempool service

* Make TransactionMsg generic
Use bincode to deserialize tx messages

* Make wakuadapter generic over tx and tx-id

* Fix wrong backend type bound

* Adapt to waku beta2

* Thread tx and id together for adapter and pool

* Panic on subscribing error

* Prune unnecessary Id type bounds

* Remove transaction placeholder

* Remove Id bound from waku adapter

* Remove empty transactions module
This commit is contained in:
Daniel Sanchez 2023-01-09 17:22:46 +01:00 committed by GitHub
parent 664cb3ccd8
commit 9721e6f5fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 208 additions and 60 deletions

View File

@ -13,6 +13,7 @@ async-trait = { version = "0.1" }
bytes = "1.3"
futures = "0.3"
raptorq = { version = "1.7", optional = true }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
[dev-dependencies]
@ -22,4 +23,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] }
[features]
default = []
raptor = ["raptorq"]
raptor = ["raptorq"]

View File

@ -72,25 +72,23 @@ impl NetworkAdapter for WakuAdapter {
Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
// TODO: this should actually check the whole content topic,
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28)
if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(
ProposalChunkMsg::from_bytes::<CHUNK_SIZE>(
payload.try_into().unwrap(),
)
.chunk,
Ok(NetworkEvent::RawMessage(message)) => {
// TODO: this should actually check the whole content topic,
// waiting for this [PR](https://github.com/waku-org/waku-rust-bindings/pull/28)
if WAKU_CARNOT_BLOCK_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(
ProposalChunkMsg::from_bytes::<CHUNK_SIZE>(
payload.try_into().unwrap(),
)
} else {
None
}
.chunk,
)
} else {
None
}
},
}
Err(_e) => None,
}
}),

View File

@ -103,7 +103,8 @@ impl ServiceCore for Logger {
async fn run(self) -> Result<(), overwatch_rs::DynError> {
// keep the handle alive without stressing the runtime
Ok(futures::pending!())
futures::pending!();
Ok(())
}
}

View File

@ -7,9 +7,18 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
bincode = { version = "2.0.0-rc.2", features = ["serde"] }
futures = "0.3"
nomos-network = { path = "../network", features = ["waku"] }
nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }
futures = "0.3"
tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-beta1", optional = true}
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]

View File

@ -1,14 +1,15 @@
use nomos_core::block::{BlockHeader, BlockId};
pub trait Pool {
pub trait MemPool {
type Settings;
type Tx;
type Id;
/// Construct a new empty pool
fn new() -> Self;
fn new(settings: Self::Settings) -> Self;
/// Add a new transaction to the mempool, for example because we received it from the network
fn add_tx(&mut self, tx: Self::Tx, id: Self::Id) -> Result<(), overwatch_rs::DynError>;
fn add_tx(&mut self, tx: Self::Tx) -> Result<(), overwatch_rs::DynError>;
/// Return a view over the transactions contained in the mempool.
/// Implementations should provide *at least* all the transactions which have not been marked as

View File

@ -1,18 +1,18 @@
/// std
pub mod backend;
pub mod network;
// std
use std::fmt::{Debug, Error, Formatter};
/// crates
use tokio::sync::broadcast::Receiver;
// crates
use futures::StreamExt;
use tokio::sync::oneshot::Sender;
/// internal
pub mod backend;
use backend::Pool;
// internal
use crate::network::NetworkAdapter;
use backend::MemPool;
use nomos_core::block::{BlockHeader, BlockId};
use nomos_network::{
backends::{waku::NetworkEvent, NetworkBackend},
NetworkService,
};
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{OutboundRelay, Relay, RelayMessage},
@ -20,19 +20,21 @@ use overwatch_rs::services::{
ServiceCore, ServiceData, ServiceId,
};
pub struct Mempool<N: NetworkBackend + Send + Sync + 'static, P: Pool + Send + Sync + 'static>
where
pub struct MempoolService<
N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
> where
P::Settings: Clone + Send + Sync + 'static,
P::Tx: Debug + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N>>,
network_relay: Relay<NetworkService<N::Backend>>,
pool: P,
}
pub enum MempoolMsg<Tx, Id> {
AddTx {
id: Id,
tx: Tx,
},
View {
@ -58,7 +60,7 @@ impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
ancestor_hint
)
}
Self::AddTx { id, tx } => write!(f, "MempoolMsg::AddTx{{id: {:?}, tx: {:?}}}", id, tx),
Self::AddTx { tx } => write!(f, "MempoolMsg::AddTx{{tx: {:?}}}", tx),
Self::Prune { ids } => write!(f, "MempoolMsg::Prune{{ids: {:?}}}", ids),
Self::MarkInBlock { ids, block } => {
write!(
@ -73,43 +75,45 @@ impl<Tx: Debug, Id: Debug> Debug for MempoolMsg<Tx, Id> {
impl<Tx: 'static, Id: 'static> RelayMessage for MempoolMsg<Tx, Id> {}
impl<N, P> ServiceData for Mempool<N, P>
impl<N, P> ServiceData for MempoolService<N, P>
where
N: NetworkBackend + Send + Sync + 'static,
P: Pool + Send + Sync + 'static,
N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static,
P::Tx: Debug + Send + Sync + 'static,
{
const SERVICE_ID: ServiceId = "Mempool";
type Settings = ();
type Settings = P::Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as Pool>::Tx, <P as Pool>::Id>;
type Message = MempoolMsg<<P as MemPool>::Tx, <P as MemPool>::Id>;
}
#[async_trait::async_trait]
impl<N, P> ServiceCore for Mempool<N, P>
impl<N, P> ServiceCore for MempoolService<N, P>
where
P: Pool + Send + Sync + 'static,
P: MemPool + Send + Sync + 'static,
P::Settings: Clone + Send + Sync + 'static,
P::Id: Debug + Send + Sync + 'static,
P::Tx: Debug + Send + Sync + 'static,
N: NetworkBackend + Send + Sync + 'static,
N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let pool_settings = service_state.settings_reader.get_updated_settings();
Ok(Self {
service_state,
network_relay,
pool: P::new(),
pool: P::new(pool_settings),
})
}
#[allow(unreachable_code, unused, clippy::diverging_sub_expression)]
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state,
mut service_state,
network_relay,
pool,
mut pool,
} = self;
let network_relay: OutboundRelay<_> = network_relay
@ -117,32 +121,33 @@ where
.await
.expect("Relay connection with NetworkService should succeed");
// Separate function so that we can specialize it for different network backends
let network_txs: Receiver<NetworkEvent> = todo!();
let adapter = N::new(network_relay).await;
let mut network_txs = adapter.transactions_stream().await;
loop {
tokio::select! {
Some(msg) = service_state.inbound_relay.recv() => {
match msg {
MempoolMsg::AddTx { id, tx } => {
pool.add_tx(tx, id).unwrap_or_else(|e| {
MempoolMsg::AddTx { tx } => {
pool.add_tx(tx).unwrap_or_else(|e| {
tracing::debug!("could not add tx to the pool due to: {}", e)
});
}
MempoolMsg::View { ancestor_hint, rx } => {
rx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {
tracing::debug!("could not send back pool view")
})
});
}
MempoolMsg::MarkInBlock { ids, block } => {
pool.mark_in_block(ids, block);
}
MempoolMsg::Prune { ids } => pool.prune(ids),
MempoolMsg::Prune { ids } => { pool.prune(ids); },
}
}
Ok(msg) = network_txs.recv() => {
// filter incoming transactions and add them to the pool
todo!()
Some(tx) = network_txs.next() => {
pool.add_tx(tx).unwrap_or_else(|e| {
tracing::debug!("could not add tx to the pool due to: {}", e)
});
}
}
}

View File

@ -0,0 +1,2 @@
#[cfg(feature = "waku")]
pub mod waku;

View File

@ -0,0 +1,100 @@
use bincode::config::{Fixint, LittleEndian, NoLimit, WriteFixedArrayLength};
use std::marker::PhantomData;
// std
// crates
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::messages::TransactionMsg;
use crate::network::NetworkAdapter;
use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic};
static WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
static WAKU_CARNOT_TX_CONTENT_TOPIC: WakuContentTopic =
WakuContentTopic::new("CarnotSim", 1, "CarnotTx", Encoding::Proto);
pub struct WakuAdapter<Tx> {
network_relay: OutboundRelay<<NetworkService<Waku> as ServiceData>::Message>,
_tx: PhantomData<Tx>,
}
#[async_trait::async_trait]
impl<Tx> NetworkAdapter for WakuAdapter<Tx>
where
Tx: DeserializeOwned + Send + Sync + 'static,
{
type Backend = Waku;
type Tx = Tx;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
// Subscribe to the carnot pubsub topic
if let Err((e, _)) = network_relay
.send(NetworkMsg::Process(WakuBackendMessage::RelaySubscribe {
topic: WAKU_CARNOT_PUB_SUB_TOPIC.clone(),
}))
.await
{
// We panic, but as we could try to reconnect later it should not be
// a problem. But definitely something to consider.
panic!(
"Couldn't send subscribe message to the network service: {}",
e
);
};
Self {
network_relay,
_tx: Default::default(),
}
}
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
{
todo!("log error");
};
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
|event| async move {
match event {
Ok(NetworkEvent::RawMessage(message)) => {
if message.content_topic().content_topic_name
== WAKU_CARNOT_TX_CONTENT_TOPIC.content_topic_name
{
let (tx, _): (TransactionMsg<Self::Tx>, _) =
// TODO: This should be temporary, we can probably extract this so we can use/try/test a variety of encodings
bincode::serde::decode_from_slice(
message.payload(),
bincode::config::Configuration::<
LittleEndian,
Fixint,
WriteFixedArrayLength,
NoLimit,
>::default(),
)
.unwrap();
Some(tx.tx)
} else {
None
}
}
Err(_e) => None,
}
},
)))
}
}

View File

@ -0,0 +1,9 @@
// std
// crates
use serde::{Deserialize, Serialize};
// internal
#[derive(Serialize, Deserialize)]
pub struct TransactionMsg<Tx> {
pub tx: Tx,
}

View File

@ -0,0 +1,22 @@
pub mod adapters;
mod messages;
// std
// crates
use futures::Stream;
// internal
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + Send + Sync + 'static;
type Tx: Send + Sync + 'static;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send>;
}