Daniel Sanchez Quiros 221cbd254a Fix tests
2024-11-07 12:03:07 +07:00

215 lines
6.5 KiB
Rust

use super::{Receivers, Subscriber};
use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use serde::{Deserialize, Serialize};
use std::{
fs::{File, OpenOptions},
io::Write,
path::PathBuf,
sync::{Arc, Mutex},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SettingsSubscriberSettings {
pub path: PathBuf,
}
impl Default for SettingsSubscriberSettings {
fn default() -> Self {
let mut tmp = std::env::temp_dir();
tmp.push("simulation");
tmp.set_extension("conf");
Self { path: tmp }
}
}
#[derive(Debug)]
pub struct SettingsSubscriber<R> {
file: Arc<Mutex<File>>,
recvs: Arc<Receivers<R>>,
}
impl<R> Subscriber for SettingsSubscriber<R>
where
R: crate::output_processors::Record + Serialize,
{
type Record = R;
type Settings = SettingsSubscriberSettings;
fn new(
record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<Sender<()>>,
settings: Self::Settings,
) -> anyhow::Result<Self>
where
Self: Sized,
{
let mut opts = OpenOptions::new();
let recvs = Receivers {
stop_rx: stop_recv,
recv: record_recv,
};
let this = SettingsSubscriber {
file: Arc::new(Mutex::new(
opts.truncate(true)
.create(true)
.read(true)
.write(true)
.open(&settings.path)?,
)),
recvs: Arc::new(recvs),
};
tracing::info!(
target = "simulation",
"subscribed to {}",
settings.path.display()
);
Ok(this)
}
fn next(&self) -> Option<anyhow::Result<Arc<Self::Record>>> {
Some(self.recvs.recv.recv().map_err(From::from))
}
fn run(self) -> anyhow::Result<()> {
crossbeam::select! {
recv(self.recvs.stop_rx) -> finish_tx => {
// collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
}
recv(self.recvs.recv) -> msg => {
self.sink(msg?)?;
}
}
Ok(())
}
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {
let mut file = self.file.lock().expect("failed to lock file");
serde_json::to_writer(&mut *file, &state)?;
file.write_all(b",\n")?;
Ok(())
}
fn subscribe_data_type() -> RecordType {
RecordType::Data
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::{collections::HashMap, time::Duration};
type View = usize;
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
};
use super::*;
#[derive(Debug, Clone, Serialize)]
struct SettingsRecord {
states: HashSet<NodeId>,
}
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for SettingsRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value.nodes.read().iter().map(|node| node.id()).collect(),
})
}
}
#[test]
fn test_streaming() {
let simulation_settings = crate::settings::SimulationSettings {
seed: Some(1),
..Default::default()
};
let nodes = (0..6)
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
delay: Duration::from_millis(100),
drop: 0.0,
},
)
})
.collect(),
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}
}