1
0
mirror of synced 2025-02-05 04:14:07 +00:00
Youngjoon Lee 8449c81d0f
Mixnet PoC base branch (#316)
* Add `mixnode` and `mixnet-client` crate (#302)

* Add `mixnode` binary (#317)

* Integrate mixnet with libp2p network backend (#318)

* Fix #312: proper delays (#321)

* proper delays

* add missing duration param

* tiny fix: compilation error caused by `rand` 0.8 -> 0.7

* use `get_available_port()` for mixnet integration tests (#333)

* add missing comments

* Overwatch mixnet node (#339)

* Add mixnet service and overwatch app

* remove #[tokio::main]

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>

* fix tests for the overwatch mixnode (#342)

* fix panic when corner case happen in RandomDelayIter (#335)

* Use `log` service for `mixnode` bin (#341)

* Use `wire` for MixnetMessage in libp2p (#347)

* Prevent tmixnet tests from running forever (#363)

* Use random delay when sending msgs to mixnet (#362)

* fix a minor compilation error caused by the latest master

* Fix run output fd (#343)

* add a connection pool

* Exp backoff (#332)

* move mixnet listening into separate task

* add exponential retry for insufficient peers in libp2p

* fix logging

* Fix MutexGuard across await (#373)

* Fix MutexGuard across await

Holding a MutexGuard across an await point is not a good idea.
Removing that solves the issues we had with the mixnet test

* Make mixnode handle bodies coming from the same source concurrently (#372)

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>

* Move wait at network startup (#338)

We now wait after the call to 'subscribe' to give the network
the time to register peers in the mesh before starting to
publish messages

* Remove unused functions from mixnet connpool (#374)

* Mixnet benchmark (#375)

* merge fixes

* add `connection_pool_size` field to `config.yaml`

* Simplify mixnet topology (#393)

* Simplify bytes and duration range ser/de (#394)

* optimize bytes serde and duration serde

---------

Co-authored-by: Al Liu <scygliu1@gmail.com>
Co-authored-by: Daniel Sanchez <sanchez.quiros.daniel@gmail.com>
Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>
2023-09-14 17:38:47 +09:00

114 lines
3.5 KiB
Rust

use nomos_core::{block::BlockId, tx::mock::MockTransaction};
use nomos_log::{Logger, LoggerSettings};
use nomos_network::{
backends::mock::{Mock, MockBackendMessage, MockConfig, MockMessage},
NetworkConfig, NetworkMsg, NetworkService,
};
use overwatch_derive::*;
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::mock::{MockAdapter, MOCK_TX_CONTENT_TOPIC},
MempoolMsg, MempoolService,
};
#[derive(Services)]
struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>>>>,
}
#[test]
fn test_mockmempool() {
let exist = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let exist2 = exist.clone();
let predefined_messages = vec![
MockMessage {
payload: "This is foo".to_string(),
content_topic: MOCK_TX_CONTENT_TOPIC,
version: 0,
timestamp: 0,
},
MockMessage {
payload: "This is bar".to_string(),
content_topic: MOCK_TX_CONTENT_TOPIC,
version: 0,
timestamp: 0,
},
];
let exp_txns = predefined_messages
.iter()
.cloned()
.collect::<std::collections::HashSet<_>>();
let app = OverwatchRunner::<MockPoolNode>::run(
MockPoolNodeServiceSettings {
network: NetworkConfig {
backend: MockConfig {
predefined_messages,
duration: tokio::time::Duration::from_millis(100),
seed: 0,
version: 1,
weights: None,
},
},
mockpool: (),
logging: LoggerSettings::default(),
},
None,
)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap();
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app
.handle()
.relay::<MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>>>>();
app.spawn(async move {
let network_outbound = network.connect().await.unwrap();
let mempool_outbound = mempool.connect().await.unwrap();
// subscribe to the mock content topic
network_outbound
.send(NetworkMsg::Process(MockBackendMessage::RelaySubscribe {
topic: MOCK_TX_CONTENT_TOPIC.content_topic_name.to_string(),
}))
.await
.unwrap();
// try to wait all txs to be stored in mempool
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let (mtx, mrx) = tokio::sync::oneshot::channel();
mempool_outbound
.send(MempoolMsg::View {
ancestor_hint: BlockId::default(),
reply_channel: mtx,
})
.await
.unwrap();
let items = mrx
.await
.unwrap()
.map(|msg| msg.message().clone())
.collect::<std::collections::HashSet<_>>();
if items.len() == exp_txns.len() {
assert_eq!(exp_txns, items);
exist.store(true, std::sync::atomic::Ordering::SeqCst);
break;
}
}
});
while !exist2.load(std::sync::atomic::Ordering::SeqCst) {
std::thread::sleep(std::time::Duration::from_millis(200));
}
}