use TimedCache to avoid OOM

This commit is contained in:
Youngjoon Lee 2024-11-08 15:37:00 +07:00
parent c220497ffb
commit 28a714b3c9
No known key found for this signature in database
GPG Key ID: 25CA11F37F095E5D
3 changed files with 17 additions and 6 deletions

View File

@ -26,3 +26,4 @@ multiaddr = "0.18"
sha2 = "0.10"
uuid = { version = "1", features = ["fast-rng", "v4"] }
tracing-appender = "0.2"
cached = "0.54.0"

View File

@ -76,7 +76,7 @@ pub fn config_tracing(
}
tracing_subscriber::registry()
.with(LevelFilter::from(Level::DEBUG))
.with(LevelFilter::from(Level::INFO))
.with(layers)
.init();

View File

@ -6,6 +6,7 @@ pub mod state;
pub mod stream_wrapper;
use crate::node::mix::consensus_streams::{Epoch, Slot};
use cached::{Cached, TimedCache};
use crossbeam::channel;
use futures::Stream;
use lottery::StakeLottery;
@ -35,7 +36,7 @@ use scheduler::{Interval, TemporalRelease};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use state::MixnodeState;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::{pin::pin, task::Poll, time::Duration};
use stream_wrapper::CrossbeamReceiverStream;
@ -70,7 +71,7 @@ pub struct MixNode {
state: MixnodeState,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
message_cache: HashSet<Sha256Hash>,
message_cache: TimedCache<Sha256Hash, ()>,
data_msg_lottery_update_time_sender: channel::Sender<Duration>,
data_msg_lottery_interval: Interval,
@ -185,7 +186,8 @@ impl MixNode {
Self {
id,
network_interface,
message_cache: HashSet::new(),
// We're not coupling this lifespan with the steps now, but it's okay
message_cache: TimedCache::with_lifespan(60),
settings,
state: MixnodeState {
node_id: id,
@ -210,7 +212,11 @@ impl MixNode {
}
fn forward(&mut self, message: MixMessage, exclude_node: Option<NodeId>) {
if !self.message_cache.insert(Self::sha256(&message.0)) {
if self
.message_cache
.cache_set(Self::sha256(&message.0), ())
.is_some()
{
return;
}
for node_id in self
@ -229,7 +235,11 @@ impl MixNode {
.receive_messages()
.into_iter()
// Retain only messages that have not been seen before
.filter(|msg| self.message_cache.insert(Self::sha256(&msg.payload().0)))
.filter(|msg| {
self.message_cache
.cache_set(Self::sha256(&msg.payload().0), ())
.is_none()
})
.collect()
}