Implement #31: Mock network backend (#41)

* relax trait bounds

* mock mempool

* remove unused generic

* add mock network test case

* fix some PR comments

* simplify match branch

* finish mempool adapter example

* clippy happy

* mock consensus
This commit is contained in:
Al Liu 2023-02-01 21:58:23 +08:00 committed by GitHub
parent 567188c248
commit 3d3d2760ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 792 additions and 2 deletions

View File

@ -24,3 +24,4 @@ tracing = "0.1"
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["nomos-network/mock"]

View File

@ -0,0 +1,170 @@
use bytes::Bytes;
use futures::StreamExt;
use nomos_network::{
backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, MockMessage, NetworkEvent,
},
NetworkMsg, NetworkService,
};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use tokio_stream::{wrappers::BroadcastStream, Stream};
use crate::{
network::{
messages::{ApprovalMsg, ProposalChunkMsg},
NetworkAdapter,
},
overlay::committees::Committee,
Approval, View,
};
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock");
const MOCK_APPROVAL_CONTENT_TOPIC: MockContentTopic =
MockContentTopic::new("MockSim", 1, "MockApproval");
pub struct MockAdapter {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,
}
impl MockAdapter {
async fn message_subscriber_channel(
&self,
) -> Result<
tokio::sync::broadcast::Receiver<NetworkEvent>,
tokio::sync::oneshot::error::RecvError,
> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((e, _e)) = self
.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
{
tracing::error!("error subscribing to network messages: {:?}", e);
};
receiver.await
}
}
#[async_trait::async_trait]
impl NetworkAdapter for MockAdapter {
type Backend = Mock;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self { network_relay }
}
async fn proposal_chunks_stream(
&self,
_committee: Committee,
_view: &View,
) -> Box<dyn Stream<Item = Bytes> + Send + Sync + Unpin> {
let stream_channel = self
.message_subscriber_channel()
.await
.map_err(|e| tracing::error!("handle error {e:?}"))
.unwrap();
Box::new(BroadcastStream::new(stream_channel).filter_map(|msg| {
Box::pin(async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
if MOCK_BLOCK_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(ProposalChunkMsg::from_bytes(payload.as_bytes()).chunk)
} else {
None
}
}
},
Err(_e) => None,
}
})
}))
}
async fn broadcast_block_chunk(
&self,
_committee: Committee,
_view: &View,
chunk_message: ProposalChunkMsg,
) {
let message = MockMessage::new(
String::from_utf8_lossy(chunk_message.as_bytes()).to_string(),
MOCK_BLOCK_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
);
if let Err((e, _)) = self
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: message,
topic: MOCK_PUB_SUB_TOPIC,
}))
.await
{
tracing::error!("Failed to broadcast block chunk: {:?}", e);
};
}
async fn approvals_stream(
&self,
_committee: Committee,
_view: &View,
) -> Box<dyn Stream<Item = Approval> + Send> {
let stream_channel = self
.message_subscriber_channel()
.await
.unwrap_or_else(|_e| todo!("handle error"));
Box::new(
BroadcastStream::new(stream_channel).filter_map(|msg| async move {
match msg {
Ok(event) => match event {
NetworkEvent::RawMessage(message) => {
if MOCK_APPROVAL_CONTENT_TOPIC.content_topic_name
== message.content_topic().content_topic_name
{
let payload = message.payload();
Some(ApprovalMsg::from_bytes(payload.as_bytes()).approval)
} else {
None
}
}
},
Err(_e) => None,
}
}),
)
}
async fn forward_approval(
&self,
_committee: Committee,
_view: &View,
approval_message: ApprovalMsg,
) {
let message = MockMessage::new(
String::from_utf8_lossy(&approval_message.as_bytes()).to_string(),
MOCK_APPROVAL_CONTENT_TOPIC,
1,
chrono::Utc::now().timestamp() as usize,
);
if let Err((e, _e)) = self
.network_relay
.send(NetworkMsg::Process(MockBackendMessage::Broadcast {
msg: message,
topic: MOCK_PUB_SUB_TOPIC,
}))
.await
{
tracing::error!("Failed to forward approval: {:?}", e);
};
}
}

View File

@ -1,2 +1,4 @@
#[cfg(feature = "mock")]
pub mod mock;
#[cfg(feature = "waku")]
pub mod waku;

View File

@ -13,14 +13,19 @@ linked-hash-map = { version = "0.5.6", optional = true }
nomos-network = { path = "../network", features = ["waku"] }
nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-beta3", optional = true}
[dev-dependencies]
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["linked-hash-map"]
mock = ["linked-hash-map", "nomos-network/mock", "rand"]

View File

@ -0,0 +1,91 @@
use nomos_network::{
backends::mock::{
EventKind, Mock, MockBackendMessage, MockConfig, MockContentTopic, MockMessage,
},
NetworkConfig, NetworkMsg, NetworkService,
};
use overwatch_derive::*;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::mock::MockAdapter, MempoolService,
};
#[derive(Services)]
struct MockPoolNode {
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter<String>, MockPool<String, String>>>,
}
fn main() {
tracing_subscriber::fmt::fmt()
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()))
.with_file(false)
.init();
let app = OverwatchRunner::<MockPoolNode>::run(
MockPoolNodeServiceSettings {
network: NetworkConfig {
backend: MockConfig {
predefined_messages: vec![
MockMessage {
payload: "This is foo".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: "foo",
},
version: 0,
timestamp: 0,
},
MockMessage {
payload: "This is bar".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: "bar",
},
version: 0,
timestamp: 0,
},
],
duration: tokio::time::Duration::from_secs(1),
seed: 0,
version: 1,
weights: None,
},
},
mockpool: (),
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap();
let network = app.handle().relay::<NetworkService<Mock>>();
app.spawn(async {
let outbound = network.connect().await.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
outbound
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender: tx,
})
.await
.unwrap();
let _rx = rx.await.unwrap();
outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: "foo",
}))
.await
.unwrap();
outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: "bar",
}))
.await
.unwrap();
});
app.wait_finished();
}

View File

@ -0,0 +1,98 @@
// std
use std::marker::PhantomData;
// crates
use futures::{Stream, StreamExt};
use nomos_network::backends::mock::{EventKind, Mock, MockBackendMessage, NetworkEvent};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::NetworkAdapter;
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_CONTENT_TOPIC: &str = "MockContentTopic";
pub struct MockAdapter<Tx> {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,
_tx: PhantomData<Tx>,
}
#[async_trait::async_trait]
impl<Tx> NetworkAdapter for MockAdapter<Tx>
where
Tx: From<String> + DeserializeOwned + Send + Sync + 'static,
{
type Backend = Mock;
type Tx = Tx;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
// send message to boot the network producer
if let Err(e) = network_relay
.send(NetworkMsg::Process(MockBackendMessage::BootProducer {
spawner: Box::new(move |fut| {
tokio::spawn(fut);
Ok(())
}),
}))
.await
{
panic!(
"Couldn't send boot producer message to the network service: {:?}",
e.0
);
}
if let Err((e, _)) = network_relay
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: MOCK_PUB_SUB_TOPIC,
}))
.await
{
panic!(
"Couldn't send subscribe message to the network service: {}",
e
);
};
Self {
network_relay,
_tx: Default::default(),
}
}
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
let (sender, receiver) = tokio::sync::oneshot::channel();
if let Err((_, e)) = self
.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
{
tracing::error!(err = ?e);
};
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
|event| async move {
match event {
Ok(NetworkEvent::RawMessage(message)) => {
tracing::info!("Received message: {:?}", message.payload());
if message.content_topic().content_topic_name == MOCK_CONTENT_TOPIC {
Some(Tx::from(message.payload()))
} else {
None
}
}
Err(_e) => None,
}
},
)))
}
}

View File

@ -1,2 +1,5 @@
#[cfg(feature = "waku")]
pub mod waku;
#[cfg(feature = "mock")]
pub mod mock;

View File

@ -8,19 +8,27 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
bytes = "1.2"
chrono = { version = "0.4", optional = true }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
multiaddr = "0.15"
serde = "1.0"
sscanf = { version = "0.4", optional = true }
sled = { version = "0.34", optional = true }
tokio = { version = "1", features = ["sync"] }
thiserror = "1.0"
tracing = "0.1"
rand = { version = "0.8", optional = true }
waku-bindings = { version = "0.1.0-beta3", optional = true }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["json"] }
tracing-gelf = "0.7"
futures = "0.3"
parking_lot = "0.12"
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
[features]
default = []
waku = ["waku-bindings"]
waku = ["waku-bindings"]
mock = ["rand", "chrono"]

View File

@ -0,0 +1,409 @@
// internal
use super::*;
use futures::future::BoxFuture;
// crates
use overwatch_rs::services::state::NoState;
use rand::{
distributions::{Distribution, WeightedIndex},
rngs::StdRng,
SeedableRng,
};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
sync::{Arc, Mutex},
};
use tokio::sync::broadcast::{self, Receiver, Sender};
use tracing::debug;
const BROADCAST_CHANNEL_BUF: usize = 16;
pub type MockMessageVersion = usize;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MockContentTopic {
pub application_name: &'static str,
pub version: usize,
pub content_topic_name: &'static str,
}
impl MockContentTopic {
pub const fn new(
application_name: &'static str,
version: usize,
content_topic_name: &'static str,
) -> Self {
Self {
application_name,
version,
content_topic_name,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct MockPubSubTopic {
pub topic_name: &'static str,
}
impl MockPubSubTopic {
pub const fn new(topic_name: &'static str) -> Self {
Self { topic_name }
}
}
#[derive(Clone, PartialEq, Eq, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MockMessage {
pub payload: String,
/// The content topic to be set on the message
pub content_topic: MockContentTopic,
/// The Mock Message version number
#[serde(default)]
pub version: MockMessageVersion,
/// Unix timestamp in nanoseconds
pub timestamp: usize,
}
impl MockMessage {
pub const fn new(
payload: String,
content_topic: MockContentTopic,
version: MockMessageVersion,
timestamp: usize,
) -> Self {
Self {
payload,
content_topic,
version,
timestamp,
}
}
pub const fn content_topic(&self) -> MockContentTopic {
self.content_topic
}
pub fn payload(&self) -> String {
self.payload.clone()
}
}
#[derive(Clone)]
pub struct Mock {
messages: Arc<Mutex<HashMap<&'static str, Vec<MockMessage>>>>,
message_event: Sender<NetworkEvent>,
subscribed_topics: Arc<Mutex<HashSet<&'static str>>>,
config: MockConfig,
}
#[derive(Clone, Debug, Serialize)]
pub struct MockConfig {
pub predefined_messages: Vec<MockMessage>,
pub duration: std::time::Duration,
pub seed: u64,
pub version: usize,
pub weights: Option<Vec<usize>>,
}
pub enum MockBackendMessage {
BootProducer {
#[allow(clippy::type_complexity)]
spawner: Box<
dyn Fn(
BoxFuture<'static, Result<(), overwatch_rs::DynError>>,
) -> Result<(), overwatch_rs::DynError>
+ Send
+ Sync
+ 'static,
>,
},
Broadcast {
topic: &'static str,
msg: MockMessage,
},
RelaySubscribe {
topic: &'static str,
},
RelayUnSubscribe {
topic: &'static str,
},
Query {
topic: &'static str,
tx: oneshot::Sender<Vec<MockMessage>>,
},
}
impl core::fmt::Debug for MockBackendMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BootProducer { .. } => write!(f, "BootProducer"),
Self::Broadcast { topic, msg } => {
write!(f, "Broadcast {{ topic: {}, msg: {:?} }}", topic, msg)
}
Self::RelaySubscribe { topic } => write!(f, "RelaySubscribe {{ topic: {} }}", topic),
Self::RelayUnSubscribe { topic } => {
write!(f, "RelayUnSubscribe {{ topic: {} }}", topic)
}
Self::Query { topic, .. } => write!(f, "Query {{ topic: {} }}", topic),
}
}
}
#[derive(Debug)]
pub enum EventKind {
Message,
}
#[derive(Debug, Clone)]
pub enum NetworkEvent {
RawMessage(MockMessage),
}
impl Mock {
/// Run producer message handler
pub async fn run_producer_handler(&self) -> Result<(), overwatch_rs::DynError> {
match &self.config.weights {
// if user provides weights, then we send the predefined messages according to the weights endlessly
Some(weights) => self.run_endless_producer(weights).await,
// if user do not provide weights, then we just send the predefined messages one by one in order
None => self.run_in_order_producer().await,
}
}
async fn run_endless_producer(&self, weights: &[usize]) -> Result<(), overwatch_rs::DynError> {
let dist = WeightedIndex::new(weights.iter())?;
let mut rng = StdRng::seed_from_u64(self.config.seed);
loop {
let idx = dist.sample(&mut rng);
tokio::time::sleep(self.config.duration).await;
let msg = &self.config.predefined_messages[idx];
match self
.message_event
.send(NetworkEvent::RawMessage(msg.clone()))
{
Ok(_) => {
tracing::debug!(
"sent message to \"{}\" to topic {}",
msg.payload,
msg.content_topic.content_topic_name
);
}
Err(e) => {
tracing::error!("error sending message: {:?}", e);
}
};
}
}
async fn run_in_order_producer(&self) -> Result<(), overwatch_rs::DynError> {
for msg in &self.config.predefined_messages {
tokio::time::sleep(self.config.duration).await;
match self
.message_event
.send(NetworkEvent::RawMessage(msg.clone()))
{
Ok(_) => {
tracing::debug!(
"sent message \"{}\" to topic {}",
msg.payload,
msg.content_topic.content_topic_name
);
}
Err(e) => {
tracing::error!("error sending message: {:?}", e);
}
};
}
Ok(())
}
}
#[async_trait::async_trait]
impl NetworkBackend for Mock {
type Settings = MockConfig;
type State = NoState<MockConfig>;
type Message = MockBackendMessage;
type EventKind = EventKind;
type NetworkEvent = NetworkEvent;
fn new(config: Self::Settings) -> Self {
let message_event = broadcast::channel(BROADCAST_CHANNEL_BUF).0;
Self {
subscribed_topics: Arc::new(Mutex::new(HashSet::new())),
messages: Arc::new(Mutex::new(
config
.predefined_messages
.iter()
.map(|p| (p.content_topic.content_topic_name, Vec::new()))
.collect(),
)),
message_event,
config,
}
}
async fn process(&self, msg: Self::Message) {
match msg {
MockBackendMessage::BootProducer { spawner } => {
tracing::info!("booting producer");
let this = self.clone();
match (spawner)(Box::pin(async move { this.run_producer_handler().await })) {
Ok(_) => {}
Err(e) => {
tracing::error!("error booting producer: {:?}", e);
}
}
}
MockBackendMessage::Broadcast { topic, msg } => {
tracing::info!("processed normal message");
self.messages
.lock()
.unwrap()
.entry(topic)
.or_insert_with(Vec::new)
.push(msg.clone());
let _ = self.message_event.send(NetworkEvent::RawMessage(msg));
}
MockBackendMessage::RelaySubscribe { topic } => {
tracing::info!("processed relay subscription for topic: {topic}");
self.subscribed_topics.lock().unwrap().insert(topic);
}
MockBackendMessage::RelayUnSubscribe { topic } => {
tracing::info!("processed relay unsubscription for topic: {topic}");
self.subscribed_topics.lock().unwrap().remove(topic);
}
MockBackendMessage::Query { topic, tx } => {
tracing::info!("processed query");
let normal_msgs = self.messages.lock().unwrap();
let msgs = normal_msgs.get(&topic).cloned().unwrap_or_default();
drop(normal_msgs);
let _ = tx.send(msgs);
}
};
}
async fn subscribe(&mut self, kind: Self::EventKind) -> Receiver<Self::NetworkEvent> {
match kind {
EventKind::Message => {
debug!("processed subscription to incoming messages");
self.message_event.subscribe()
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_network() {
let config = MockConfig {
predefined_messages: vec![
MockMessage {
payload: "foo".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: "foo",
},
version: 0,
timestamp: 0,
},
MockMessage {
payload: "bar".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: "bar",
},
version: 0,
timestamp: 0,
},
],
duration: tokio::time::Duration::from_secs(1),
seed: 0,
version: 1,
weights: None,
};
let mock = Arc::new(Mock::new(config));
// run producer
let task = mock.clone();
tokio::spawn(async move {
task.run_producer_handler().await.unwrap();
});
static FOO_BROADCAST_MESSAGES: &[&str] = &["foo1", "foo2"];
static BAR_BROADCAST_MESSAGES: &[&str] = &["bar1"];
// broadcast
for val in FOO_BROADCAST_MESSAGES {
mock.process(MockBackendMessage::Broadcast {
topic: "foo",
msg: MockMessage {
payload: val.to_string(),
content_topic: MockContentTopic {
application_name: "mock",
version: 1,
content_topic_name: "foo content",
},
version: 1,
timestamp: chrono::Utc::now().timestamp() as usize,
},
})
.await;
}
for val in BAR_BROADCAST_MESSAGES {
mock.process(MockBackendMessage::Broadcast {
topic: "bar",
msg: MockMessage {
payload: val.to_string(),
content_topic: MockContentTopic {
application_name: "mock",
version: 1,
content_topic_name: "bar content",
},
version: 1,
timestamp: chrono::Utc::now().timestamp() as usize,
},
})
.await;
}
// query
let (qtx, qrx) = oneshot::channel();
mock.process(MockBackendMessage::Query {
topic: "foo",
tx: qtx,
})
.await;
let query_result = qrx.await.unwrap();
assert_eq!(query_result.len(), 2);
for idx in 0..FOO_BROADCAST_MESSAGES.len() {
assert_eq!(&query_result[idx].payload, FOO_BROADCAST_MESSAGES[idx]);
}
// subscribe
mock.process(MockBackendMessage::RelaySubscribe { topic: "foo" })
.await;
mock.process(MockBackendMessage::RelaySubscribe { topic: "bar" })
.await;
assert!(mock.subscribed_topics.lock().unwrap().contains("foo"));
assert!(mock.subscribed_topics.lock().unwrap().contains("bar"));
// unsubscribe
mock.process(MockBackendMessage::RelayUnSubscribe { topic: "foo" })
.await;
assert!(!mock.subscribed_topics.lock().unwrap().contains("foo"));
assert!(mock.subscribed_topics.lock().unwrap().contains("bar"));
mock.process(MockBackendMessage::RelayUnSubscribe { topic: "bar" })
.await;
assert!(!mock.subscribed_topics.lock().unwrap().contains("foo"));
assert!(!mock.subscribed_topics.lock().unwrap().contains("bar"));
}
}

View File

@ -5,6 +5,9 @@ use tokio::sync::broadcast::Receiver;
#[cfg(feature = "waku")]
pub mod waku;
#[cfg(feature = "mock")]
pub mod mock;
#[async_trait::async_trait]
pub trait NetworkBackend {
type Settings: Clone + Debug + Send + Sync + 'static;