nomos-node/mixnet/util/src/lib.rs

30 lines
818 B
Rust
Raw Normal View History

Mixnet PoC base branch (#316) * Add `mixnode` and `mixnet-client` crate (#302) * Add `mixnode` binary (#317) * Integrate mixnet with libp2p network backend (#318) * Fix #312: proper delays (#321) * proper delays * add missing duration param * tiny fix: compilation error caused by `rand` 0.8 -> 0.7 * use `get_available_port()` for mixnet integration tests (#333) * add missing comments * Overwatch mixnet node (#339) * Add mixnet service and overwatch app * remove #[tokio::main] --------- Co-authored-by: Youngjoon Lee <taxihighway@gmail.com> * fix tests for the overwatch mixnode (#342) * fix panic when corner case happen in RandomDelayIter (#335) * Use `log` service for `mixnode` bin (#341) * Use `wire` for MixnetMessage in libp2p (#347) * Prevent tmixnet tests from running forever (#363) * Use random delay when sending msgs to mixnet (#362) * fix a minor compilation error caused by the latest master * Fix run output fd (#343) * add a connection pool * Exp backoff (#332) * move mixnet listening into separate task * add exponential retry for insufficient peers in libp2p * fix logging * Fix MutexGuard across await (#373) * Fix MutexGuard across await Holding a MutexGuard across an await point is not a good idea. Removing that solves the issues we had with the mixnet test * Make mixnode handle bodies coming from the same source concurrently (#372) --------- Co-authored-by: Youngjoon Lee <taxihighway@gmail.com> * Move wait at network startup (#338) We now wait after the call to 'subscribe' to give the network the time to register peers in the mesh before starting to publish messages * Remove unused functions from mixnet connpool (#374) * Mixnet benchmark (#375) * merge fixes * add `connection_pool_size` field to `config.yaml` * Simplify mixnet topology (#393) * Simplify bytes and duration range ser/de (#394) * optimize bytes serde and duration serde --------- Co-authored-by: Al Liu <scygliu1@gmail.com> Co-authored-by: Daniel Sanchez <sanchez.quiros.daniel@gmail.com> Co-authored-by: Giacomo Pasini <Zeegomo@users.noreply.github.com>
2023-09-14 08:38:47 +00:00
use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
#[derive(Clone)]
pub struct ConnectionPool {
pool: Arc<Mutex<HashMap<SocketAddr, Arc<Mutex<TcpStream>>>>>,
}
impl ConnectionPool {
pub fn new(size: usize) -> Self {
Self {
pool: Arc::new(Mutex::new(HashMap::with_capacity(size))),
}
}
pub async fn get_or_init(&self, addr: &SocketAddr) -> std::io::Result<Arc<Mutex<TcpStream>>> {
let mut pool = self.pool.lock().await;
match pool.get(addr).cloned() {
Some(tcp) => Ok(tcp),
None => {
let tcp = Arc::new(Mutex::new(TcpStream::connect(addr).await?));
pool.insert(*addr, tcp.clone());
Ok(tcp)
}
}
}
}