Mock mempool integration test (#66)

* finish mock mempool integration test

* use Log service for test

* remove unused example

* use millis and merge log PR
This commit is contained in:
Al Liu 2023-02-07 17:13:22 +08:00 committed by GitHub
parent c5ac1db44c
commit 146001c9fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 95 deletions

View File

@ -22,9 +22,9 @@ tokio-stream = "0.1"
waku-bindings = { version = "0.1.0-beta3", optional = true}
[dev-dependencies]
nomos-log = { path = "../log" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tokio = { version = "1", features = ["full"] }
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
[features]
default = []

View File

@ -1,91 +0,0 @@
use nomos_network::{
backends::mock::{
EventKind, Mock, MockBackendMessage, MockConfig, MockContentTopic, MockMessage,
},
NetworkConfig, NetworkMsg, NetworkService,
};
use overwatch_derive::*;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::mock::MockAdapter, MempoolService,
};
#[derive(Services)]
struct MockPoolNode {
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter<String>, MockPool<String, String>>>,
}
fn main() {
tracing_subscriber::fmt::fmt()
.with_env_filter(std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()))
.with_file(false)
.init();
let app = OverwatchRunner::<MockPoolNode>::run(
MockPoolNodeServiceSettings {
network: NetworkConfig {
backend: MockConfig {
predefined_messages: vec![
MockMessage {
payload: "This is foo".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: "foo",
},
version: 0,
timestamp: 0,
},
MockMessage {
payload: "This is bar".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: "bar",
},
version: 0,
timestamp: 0,
},
],
duration: tokio::time::Duration::from_secs(1),
seed: 0,
version: 1,
weights: None,
},
},
mockpool: (),
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap();
let network = app.handle().relay::<NetworkService<Mock>>();
app.spawn(async {
let outbound = network.connect().await.unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
outbound
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender: tx,
})
.await
.unwrap();
let _rx = rx.await.unwrap();
outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: "foo",
}))
.await
.unwrap();
outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: "bar",
}))
.await
.unwrap();
});
app.wait_finished();
}

View File

@ -15,9 +15,9 @@ use tokio_stream::wrappers::BroadcastStream;
// internal
use crate::network::NetworkAdapter;
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_CONTENT_TOPIC: &str = "MockContentTopic";
const MOCK_TX_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("Mock", 1, "Tx");
pub const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
pub const MOCK_CONTENT_TOPIC: &str = "MockContentTopic";
pub const MOCK_TX_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("Mock", 1, "Tx");
pub struct MockAdapter<Tx> {
network_relay: OutboundRelay<<NetworkService<Mock> as ServiceData>::Message>,

View File

@ -0,0 +1,121 @@
use nomos_core::block::BlockId;
use nomos_log::{Logger, LoggerSettings};
use nomos_network::{
backends::mock::{Mock, MockBackendMessage, MockConfig, MockContentTopic, MockMessage},
NetworkConfig, NetworkMsg, NetworkService,
};
use overwatch_derive::*;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::mock::{MockAdapter, MOCK_CONTENT_TOPIC},
MempoolMsg, MempoolService,
};
#[derive(Services)]
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter<String>, MockPool<String, String>>>,
}
#[test]
fn test_mockmempool() {
let exist = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let exist2 = exist.clone();
let predefined_messages = vec![
MockMessage {
payload: "This is foo".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: MOCK_CONTENT_TOPIC,
},
version: 0,
timestamp: 0,
},
MockMessage {
payload: "This is bar".to_string(),
content_topic: MockContentTopic {
application_name: "mock network",
version: 0,
content_topic_name: MOCK_CONTENT_TOPIC,
},
version: 0,
timestamp: 0,
},
];
let exp_txns = predefined_messages
.iter()
.map(|msg| msg.payload.clone())
.collect::<std::collections::HashSet<_>>();
let app = OverwatchRunner::<MockPoolNode>::run(
MockPoolNodeServiceSettings {
network: NetworkConfig {
backend: MockConfig {
predefined_messages,
duration: tokio::time::Duration::from_millis(100),
seed: 0,
version: 1,
weights: None,
},
},
mockpool: (),
logging: LoggerSettings::default(),
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap();
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter<String>, MockPool<String, String>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();
let mempool_outbound = mempool.connect().await.unwrap();
// subscribe to the mock content topic
network_outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: MOCK_CONTENT_TOPIC,
}))
.await
.unwrap();
// try to wait all txs to be stored in mempool
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let (mtx, mrx) = tokio::sync::oneshot::channel();
mempool_outbound
.send(MempoolMsg::View {
ancestor_hint: BlockId,
tx: mtx,
})
.await
.unwrap();
let items = mrx
.await
.unwrap()
.into_iter()
.collect::<std::collections::HashSet<_>>();
if items.len() == exp_txns.len() {
assert_eq!(exp_txns, items);
exist.store(true, std::sync::atomic::Ordering::SeqCst);
break;
}
}
});
while !exist2.load(std::sync::atomic::Ordering::SeqCst) {
std::thread::sleep(std::time::Duration::from_millis(200));
}
}