Improve Mock network (#78)

* add MockTxId and send back transaction response messages
This commit is contained in:
Al Liu 2023-02-21 16:10:26 +08:00 committed by GitHub
parent 7f7a0db88a
commit 7f609db62a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 62 additions and 39 deletions

View File

@ -12,11 +12,14 @@ authors = [
async-trait = { version = "0.1" }
bytes = "1.3"
futures = "0.3"
nomos-network = { path = "../nomos-services/network", optional = true }
raptorq = { version = "1.7", optional = true }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
bincode = "1.3"
once_cell = "1.0"
blake2 = { version = "0.10", optional = true }
serde_json = { version = "1", optional = true }
[dev-dependencies]
rand = "0.8"
@ -26,4 +29,4 @@ tokio = { version = "1.23", features = ["macros", "rt"] }
[features]
default = []
raptor = ["raptorq"]
mock = []
mock = ["nomos-network/mock", "blake2", "serde_json"]

23
nomos-core/src/tx/mock.rs Normal file
View File

@ -0,0 +1,23 @@
use blake2::{
digest::{Update, VariableOutput},
Blake2bVar,
};
#[derive(Clone, Debug, Eq, PartialEq, Hash, serde::Serialize)]
pub enum MockTransactionMsg {
Request(nomos_network::backends::mock::MockMessage),
Response(nomos_network::backends::mock::MockMessage),
}
#[derive(Debug, Eq, Hash, PartialEq, Ord, Clone, PartialOrd)]
pub struct MockTxId([u8; 32]);
impl From<&MockTransactionMsg> for MockTxId {
fn from(tx: &MockTransactionMsg) -> Self {
let mut hasher = Blake2bVar::new(32).unwrap();
hasher.update(serde_json::to_string(tx).unwrap().as_bytes());
let mut id = [0u8; 32];
hasher.finalize_variable(&mut id).unwrap();
Self(id)
}
}

View File

@ -1,4 +1,7 @@
#[cfg(feature = "mock")]
pub mod mock;
mod transaction;
use serde::{Deserialize, Serialize};
pub use transaction::Transaction;

View File

@ -26,8 +26,9 @@ waku-bindings = { version = "0.1.0-beta4", optional = true}
nomos-log = { path = "../log" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tokio = { version = "1", features = ["full"] }
blake2 = "0.10"
[features]
default = []
waku = ["nomos-network/waku", "waku-bindings"]
mock = ["linked-hash-map", "nomos-network/mock", "rand"]
mock = ["linked-hash-map", "nomos-network/mock", "rand", "nomos-core/mock"]

View File

@ -1,15 +1,14 @@
// std
use std::marker::PhantomData;
// crates
use futures::{Stream, StreamExt};
use nomos_core::tx::mock::MockTransactionMsg;
use nomos_network::backends::mock::{
EventKind, Mock, MockBackendMessage, MockContentTopic, NetworkEvent,
};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use tokio_stream::wrappers::BroadcastStream;
// internal
@ -19,18 +18,14 @@ pub const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
pub const MOCK_CONTENT_TOPIC: &str = "MockContentTopic";
pub const MOCK_TX_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("Mock", 1, "Tx");
pub struct MockAdapter<Tx> {
pub struct MockAdapter {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,
_tx: PhantomData<Tx>,
}
#[async_trait::async_trait]
impl<Tx> NetworkAdapter for MockAdapter<Tx>
where
Tx: From<String> + Into<String> + DeserializeOwned + Send + Sync + 'static,
{
impl NetworkAdapter for MockAdapter {
type Backend = Mock;
type Tx = Tx;
type Tx = MockTransactionMsg;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
@ -59,10 +54,7 @@ where
{
panic!("Couldn't send subscribe message to the network service: {e}",);
};
Self {
network_relay,
_tx: Default::default(),
}
Self { network_relay }
}
async fn transactions_stream(&self) -> Box<dyn Stream<Item = Self::Tx> + Unpin + Send> {
@ -84,10 +76,10 @@ where
match event {
Ok(NetworkEvent::RawMessage(message)) => {
tracing::info!("Received message: {:?}", message.payload());
if message.content_topic().content_topic_name == MOCK_CONTENT_TOPIC {
Some(Tx::from(message.payload()))
if message.content_topic() == MOCK_TX_CONTENT_TOPIC {
Some(MockTransactionMsg::Request(message))
} else {
None
Some(MockTransactionMsg::Response(message))
}
}
Err(_e) => None,

View File

@ -1,7 +1,10 @@
use nomos_core::block::BlockId;
use nomos_core::{
block::BlockId,
tx::mock::{MockTransactionMsg, MockTxId},
};
use nomos_log::{Logger, LoggerSettings};
use nomos_network::{
backends::mock::{Mock, MockBackendMessage, MockConfig, MockContentTopic, MockMessage},
backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage},
NetworkConfig, NetworkMsg, NetworkService,
};
use overwatch_derive::*;
@ -9,7 +12,7 @@ use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::mock::{MockAdapter, MOCK_CONTENT_TOPIC},
network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC},
MempoolMsg, MempoolService,
};
@ -17,7 +20,7 @@ use nomos_mempool::{
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter<String>, MockPool<String, String>>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTxId, MockTransactionMsg>>>,
}
#[test]
@ -28,21 +31,13 @@ fn test_mockmempool() {
let predefined_messages = vec![
MockMessage {
payload: "This is foo".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: MOCK_CONTENT_TOPIC,
},
content_topic: MOCK_TX_CONTENT_TOPIC,
version: 0,
timestamp: 0,
},
MockMessage {
payload: "This is bar".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: MOCK_CONTENT_TOPIC,
},
content_topic: MOCK_TX_CONTENT_TOPIC,
version: 0,
timestamp: 0,
},
@ -50,7 +45,7 @@ fn test_mockmempool() {
let exp_txns = predefined_messages
.iter()
.map(|msg| msg.payload.clone())
.cloned()
.collect::<std::collections::HashSet<_>>();
let app = OverwatchRunner::<MockPoolNode>::run(
@ -75,7 +70,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter<String>, MockPool<String, String>>>();
.relay::<MempoolService<MockAdapter, MockPool<MockTxId, MockTransactionMsg>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();
@ -84,7 +79,7 @@ fn test_mockmempool() {
// subscribe to the mock content topic
network_outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: MOCK_CONTENT_TOPIC,
topic: MOCK_TX_CONTENT_TOPIC.content_topic_name,
}))
.await
.unwrap();
@ -105,6 +100,13 @@ fn test_mockmempool() {
.await
.unwrap()
.into_iter()
.filter_map(|tx| {
if let MockTransactionMsg::Request(msg) = tx {
Some(msg)
} else {
None
}
})
.collect::<std::collections::HashSet<_>>();
if items.len() == exp_txns.len() {

View File

@ -21,7 +21,7 @@ const BROADCAST_CHANNEL_BUF: usize = 16;
pub type MockMessageVersion = usize;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct MockContentTopic {
pub application_name: &'static str,
pub version: usize,
@ -53,7 +53,7 @@ impl MockPubSubTopic {
}
}
#[derive(Clone, PartialEq, Eq, Serialize, Debug)]
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MockMessage {
pub payload: String,
@ -98,7 +98,7 @@ pub struct Mock {
config: MockConfig,
}
#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug)]
pub struct MockConfig {
pub predefined_messages: Vec<MockMessage>,
pub duration: std::time::Duration,
@ -279,7 +279,6 @@ impl NetworkBackend for Mock {
tracing::info!("processed query");
let normal_msgs = self.messages.lock().unwrap();
let msgs = normal_msgs.get(&topic).cloned().unwrap_or_default();
drop(normal_msgs);
let _ = tx.send(msgs);
}
};