lssa/mempool/src/lib.rs

100 lines
2.5 KiB
Rust
Raw Normal View History

2025-11-18 19:31:03 +03:00
use tokio::sync::mpsc::{Receiver, Sender};
2024-10-07 13:35:29 +03:00
2025-11-18 19:31:03 +03:00
pub struct MemPool<T> {
receiver: Receiver<T>,
2024-10-07 13:35:29 +03:00
}
2025-11-18 19:31:03 +03:00
impl<T> MemPool<T> {
pub fn new(max_size: usize) -> (Self, MemPoolHandle<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(max_size);
2024-10-07 13:35:29 +03:00
2025-11-18 19:31:03 +03:00
let mem_pool = Self { receiver };
let sender = MemPoolHandle::new(sender);
(mem_pool, sender)
2024-10-07 13:35:29 +03:00
}
2025-11-18 19:31:03 +03:00
pub fn pop(&mut self) -> Option<T> {
use tokio::sync::mpsc::error::TryRecvError;
2024-10-07 13:35:29 +03:00
2025-11-18 19:31:03 +03:00
match self.receiver.try_recv() {
Ok(item) => Some(item),
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Disconnected) => {
panic!("Mempool senders disconnected, cannot receive items, this is a bug")
2024-10-07 13:35:29 +03:00
}
}
}
2025-11-18 19:31:03 +03:00
}
2024-10-07 13:35:29 +03:00
2025-11-18 19:31:03 +03:00
pub struct MemPoolHandle<T> {
sender: Sender<T>,
2024-10-07 13:35:29 +03:00
}
2025-11-18 19:31:03 +03:00
impl<T> MemPoolHandle<T> {
fn new(sender: Sender<T>) -> Self {
Self { sender }
}
/// Send an item to the mempool blocking if max size is reached
pub async fn push(&self, item: T) -> Result<(), tokio::sync::mpsc::error::SendError<T>> {
self.sender.send(item).await
2024-10-07 13:35:29 +03:00
}
}
#[cfg(test)]
mod tests {
use super::*;
2025-11-18 19:31:03 +03:00
use tokio::test;
2024-10-16 04:30:54 +02:00
2024-10-16 04:31:31 +02:00
#[test]
2025-11-18 19:31:03 +03:00
async fn test_mempool_new() {
let (mut pool, _handle): (MemPool<u64>, _) = MemPool::new(10);
assert_eq!(pool.pop(), None);
2024-10-16 04:31:31 +02:00
}
2024-10-16 04:30:54 +02:00
2024-10-16 04:31:49 +02:00
#[test]
2025-11-18 19:31:03 +03:00
async fn test_push_and_pop() {
let (mut pool, handle) = MemPool::new(10);
2024-10-16 04:31:49 +02:00
2025-11-18 19:31:03 +03:00
handle.push(1).await.unwrap();
2024-10-16 04:31:49 +02:00
2025-11-18 19:31:03 +03:00
let item = pool.pop();
assert_eq!(item, Some(1));
assert_eq!(pool.pop(), None);
2024-10-16 04:32:17 +02:00
}
2024-10-16 04:32:38 +02:00
#[test]
2025-11-18 19:31:03 +03:00
async fn test_multiple_push_pop() {
let (mut pool, handle) = MemPool::new(10);
2024-10-16 04:32:38 +02:00
2025-11-18 19:31:03 +03:00
handle.push(1).await.unwrap();
handle.push(2).await.unwrap();
handle.push(3).await.unwrap();
2024-10-16 04:32:38 +02:00
2025-11-18 19:31:03 +03:00
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
assert_eq!(pool.pop(), Some(3));
assert_eq!(pool.pop(), None);
2024-10-16 04:33:11 +02:00
}
2024-10-16 04:32:17 +02:00
2024-10-07 13:35:29 +03:00
#[test]
2025-11-18 19:31:03 +03:00
async fn test_pop_empty() {
let (mut pool, _handle): (MemPool<u64>, _) = MemPool::new(10);
assert_eq!(pool.pop(), None);
2024-10-07 13:35:29 +03:00
}
#[test]
2025-11-18 19:31:03 +03:00
async fn test_max_size() {
let (mut pool, handle) = MemPool::new(2);
2024-10-07 13:35:29 +03:00
2025-11-18 19:31:03 +03:00
handle.push(1).await.unwrap();
handle.push(2).await.unwrap();
2024-10-07 13:35:29 +03:00
2025-11-18 19:31:03 +03:00
// This should block if buffer is full, but we'll use try_send in a real scenario
// For now, just verify we can pop items
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
2024-10-07 13:35:29 +03:00
}
}