Network transaction broadcasting (#63)

* Broadcast transaction when validated in mempool

* Clippy happy

* Use standard bincode config for tx message decoding

* add send_transaction for mock (#64)

* add send_transaction for mock

* Use wire instead of direct bincode

* Use wire instead of direct bincode on deserialization

---------

Co-authored-by: Al Liu <scygliu1@gmail.com>
This commit is contained in:
Daniel Sanchez 2023-02-06 12:49:24 +01:00 committed by GitHub
parent 4705cc213b
commit c5ac1db44c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 33 deletions

View File

@ -8,6 +8,7 @@ edition = "2021"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
bincode = { version = "2.0.0-rc.2", features = ["serde"] } bincode = { version = "2.0.0-rc.2", features = ["serde"] }
chrono = "0.4"
futures = "0.3" futures = "0.3"
linked-hash-map = { version = "0.5.6", optional = true } linked-hash-map = { version = "0.5.6", optional = true }
nomos-network = { path = "../network", features = ["waku"] } nomos-network = { path = "../network", features = ["waku"] }

View File

@ -99,7 +99,7 @@ where
P: MemPool + Send + 'static, P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static, P::Settings: Clone + Send + Sync + 'static,
P::Id: Debug + Send + 'static, P::Id: Debug + Send + 'static,
P::Tx: Debug + Send + 'static, P::Tx: Clone + Debug + Send + Sync + 'static,
N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static, N: NetworkAdapter<Tx = P::Tx> + Send + Sync + 'static,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
@ -132,9 +132,14 @@ where
Some(msg) = service_state.inbound_relay.recv() => { Some(msg) = service_state.inbound_relay.recv() => {
match msg { match msg {
MempoolMsg::AddTx { tx } => { MempoolMsg::AddTx { tx } => {
pool.add_tx(tx).unwrap_or_else(|e| { match pool.add_tx(tx.clone()) {
tracing::debug!("could not add tx to the pool due to: {}", e) Ok(_id) => {
}); adapter.send_transaction(tx).await;
}
Err(e) => {
tracing::debug!("could not add tx to the pool due to: {}", e);
}
}
} }
MempoolMsg::View { ancestor_hint, tx } => { MempoolMsg::View { ancestor_hint, tx } => {
tx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { tx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {

View File

@ -3,7 +3,9 @@ use std::marker::PhantomData;
// crates // crates
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use nomos_network::backends::mock::{EventKind, Mock, MockBackendMessage, NetworkEvent}; use nomos_network::backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent,
};
use nomos_network::{NetworkMsg, NetworkService}; use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
@ -15,6 +17,7 @@ use crate::network::NetworkAdapter;
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic"; const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_CONTENT_TOPIC: &str = "MockContentTopic"; const MOCK_CONTENT_TOPIC: &str = "MockContentTopic";
const MOCK_TX_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("Mock", 1, "Tx");
pub struct MockAdapter<Tx> { pub struct MockAdapter<Tx> {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,
@ -24,7 +27,7 @@ pub struct MockAdapter<Tx> {
#[async_trait::async_trait] #[async_trait::async_trait]
impl<Tx> NetworkAdapter for MockAdapter<Tx> impl<Tx> NetworkAdapter for MockAdapter<Tx>
where where
Tx: From<String> + DeserializeOwned + Send + Sync + 'static, Tx: From<String> + Into<String> + DeserializeOwned + Send + Sync + 'static,
{ {
type Backend = Mock; type Backend = Mock;
type Tx = Tx; type Tx = Tx;
@ -54,10 +57,7 @@ where
})) }))
.await .await
{ {
panic!( panic!("Couldn't send subscribe message to the network service: {e}",);
"Couldn't send subscribe message to the network service: {}",
e
);
}; };
Self { Self {
network_relay, network_relay,
@ -95,4 +95,22 @@ where
}, },
))) )))
} }
async fn send_transaction(&self, tx: Self::Tx) {
if let Err((e, _e)) = self
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: MockMessage::new(
tx.into(),
MOCK_TX_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
),
topic: MOCK_PUB_SUB_TOPIC,
}))
.await
{
tracing::error!(err = ?e);
};
}
} }

View File

@ -1,6 +1,5 @@
use bincode::config::{Fixint, LittleEndian, NoLimit, WriteFixedArrayLength};
use std::marker::PhantomData;
// std // std
use std::marker::PhantomData;
// crates // crates
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@ -8,11 +7,13 @@ use tokio_stream::wrappers::BroadcastStream;
// internal // internal
use crate::network::messages::TransactionMsg; use crate::network::messages::TransactionMsg;
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage}; use nomos_network::backends::waku::{EventKind, NetworkEvent, Waku, WakuBackendMessage};
use nomos_network::{NetworkMsg, NetworkService}; use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
use waku_bindings::{Encoding, WakuContentTopic, WakuPubSubTopic}; use serde::Serialize;
use waku_bindings::{Encoding, WakuContentTopic, WakuMessage, WakuPubSubTopic};
const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic = const WAKU_CARNOT_PUB_SUB_TOPIC: WakuPubSubTopic =
WakuPubSubTopic::new("CarnotSim", Encoding::Proto); WakuPubSubTopic::new("CarnotSim", Encoding::Proto);
@ -28,7 +29,7 @@ pub struct WakuAdapter<Tx> {
#[async_trait::async_trait] #[async_trait::async_trait]
impl<Tx> NetworkAdapter for WakuAdapter<Tx> impl<Tx> NetworkAdapter for WakuAdapter<Tx>
where where
Tx: DeserializeOwned + Send + Sync + 'static, Tx: DeserializeOwned + Serialize + Send + Sync + 'static,
{ {
type Backend = Waku; type Backend = Waku;
type Tx = Tx; type Tx = Tx;
@ -69,21 +70,9 @@ where
|event| async move { |event| async move {
match event { match event {
Ok(NetworkEvent::RawMessage(message)) => { Ok(NetworkEvent::RawMessage(message)) => {
if message.content_topic().content_topic_name if message.content_topic() == &WAKU_CARNOT_TX_CONTENT_TOPIC {
== WAKU_CARNOT_TX_CONTENT_TOPIC.content_topic_name let tx: TransactionMsg<Self::Tx> =
{ wire::deserializer(message.payload()).deserialize().unwrap();
let (tx, _): (TransactionMsg<Self::Tx>, _) =
// TODO: This should be temporary, we can probably extract this so we can use/try/test a variety of encodings
bincode::serde::decode_from_slice(
message.payload(),
bincode::config::Configuration::<
LittleEndian,
Fixint,
WriteFixedArrayLength,
NoLimit,
>::default(),
)
.unwrap();
Some(tx.tx) Some(tx.tx)
} else { } else {
None None
@ -94,4 +83,23 @@ where
}, },
))) )))
} }
async fn send_transaction(&self, tx: Self::Tx) {
let payload = wire::serialize(&tx).expect("Tx serialization failed");
if let Err((_, _e)) = self
.network_relay
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
message: WakuMessage::new(
payload,
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
1,
chrono::Utc::now().timestamp() as usize,
),
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
}))
.await
{
todo!("log error");
};
}
} }

View File

@ -19,4 +19,5 @@ pub trait NetworkAdapter {
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self; ) -> Self;
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send>; async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send>;
async fn send_transaction(&self, tx: Self::Tx);
} }

View File

@ -140,13 +140,13 @@ impl core::fmt::Debug for MockBackendMessage {
match self { match self {
Self::BootProducer { .. } => write!(f, "BootProducer"), Self::BootProducer { .. } => write!(f, "BootProducer"),
Self::Broadcast { topic, msg } => { Self::Broadcast { topic, msg } => {
write!(f, "Broadcast {{ topic: {}, msg: {:?} }}", topic, msg) write!(f, "Broadcast {{ topic: {topic}, msg: {msg:?} }}")
} }
Self::RelaySubscribe { topic } => write!(f, "RelaySubscribe {{ topic: {} }}", topic), Self::RelaySubscribe { topic } => write!(f, "RelaySubscribe {{ topic: {topic} }}"),
Self::RelayUnSubscribe { topic } => { Self::RelayUnSubscribe { topic } => {
write!(f, "RelayUnSubscribe {{ topic: {} }}", topic) write!(f, "RelayUnSubscribe {{ topic: {topic} }}")
} }
Self::Query { topic, .. } => write!(f, "Query {{ topic: {} }}", topic), Self::Query { topic, .. } => write!(f, "Query {{ topic: {topic} }}"),
} }
} }
} }