diff --git a/network-runner/config/mixnode.json b/network-runner/config/mixnode.json index b2b494a..957af74 100644 --- a/network-runner/config/mixnode.json +++ b/network-runner/config/mixnode.json @@ -23,7 +23,7 @@ "step_time": "10ms", "runner_settings": "Sync", "stream_settings": { - "path": "test.csv" + "path": "test.json" }, "node_count": 3, "seed": 0, diff --git a/network-runner/src/bin/app/main.rs b/network-runner/src/bin/app/main.rs index bc0008a..e58781f 100644 --- a/network-runner/src/bin/app/main.rs +++ b/network-runner/src/bin/app/main.rs @@ -10,11 +10,10 @@ use crossbeam::channel; use nomos_simulations_network_runner::network::behaviour::create_behaviours; use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData}; use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network}; -use nomos_simulations_network_runner::node::mix::{ - MixMessage, MixNode, MixNodeState, MixnodeSettings, -}; +use nomos_simulations_network_runner::node::mix::state::{MixnodeRecord, MixnodeState}; +use nomos_simulations_network_runner::node::mix::{MixMessage, MixNode, MixnodeSettings}; use nomos_simulations_network_runner::node::{NodeId, NodeIdExt}; -use nomos_simulations_network_runner::output_processors::{OutData, Record}; +use nomos_simulations_network_runner::output_processors::Record; use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle}; #[cfg(feature = "polars")] use nomos_simulations_network_runner::streaming::polars::PolarsSubscriber; @@ -113,7 +112,7 @@ fn create_boxed_mixnode( simulation_settings: SimulationSettings, no_netcap: bool, mixnode_settings: MixnodeSettings, -) -> BoxedNode { +) -> BoxedNode { let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded(); let (node_message_sender, node_message_receiver) = channel::unbounded(); // Dividing milliseconds in second by milliseconds in the step. @@ -156,22 +155,26 @@ where T: Serialize + Clone + 'static, { let stream_settings = settings.stream_settings.clone(); - let runner = - SimulationRunner::<_, OutData, S, T>::new(network, nodes, Default::default(), settings)?; + let runner = SimulationRunner::<_, MixnodeRecord, S, T>::new( + network, + nodes, + Default::default(), + settings, + )?; let handle = match stream_type { Some(StreamType::Naive) => { let settings = stream_settings.unwrap_naive(); - runner.simulate_and_subscribe::>(settings)? + runner.simulate_and_subscribe::>(settings)? } Some(StreamType::IO) => { let settings = stream_settings.unwrap_io(); - runner.simulate_and_subscribe::>(settings)? + runner.simulate_and_subscribe::>(settings)? } #[cfg(feature = "polars")] Some(StreamType::Polars) => { let settings = stream_settings.unwrap_polars(); - runner.simulate_and_subscribe::>(settings)? + runner.simulate_and_subscribe::>(settings)? } None => runner.simulate()?, }; diff --git a/network-runner/src/node/mix/mod.rs b/network-runner/src/node/mix/mod.rs index 21330cf..2d32508 100644 --- a/network-runner/src/node/mix/mod.rs +++ b/network-runner/src/node/mix/mod.rs @@ -1,13 +1,11 @@ +pub mod state; + use super::{Node, NodeId}; use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use state::MixnodeState; use std::time::Duration; -#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)] -pub struct MixNodeState { - pub mock_counter: usize, -} - #[derive(Debug, Clone)] pub enum MixMessage { Dummy(String), @@ -19,6 +17,7 @@ impl PayloadSize for MixMessage { } } +#[derive(Clone, Default, Deserialize)] pub struct MixnodeSettings { pub connected_peers: Vec, } @@ -26,7 +25,7 @@ pub struct MixnodeSettings { /// This node implementation only used for testing different streaming implementation purposes. pub struct MixNode { id: NodeId, - state: MixNodeState, + state: MixnodeState, settings: MixnodeSettings, network_interface: InMemoryNetworkInterface, } @@ -41,7 +40,11 @@ impl MixNode { id, network_interface, settings, - state: MixNodeState::default(), + state: MixnodeState { + node_id: id, + mock_counter: 0, + step_id: 0, + }, } } } @@ -49,7 +52,7 @@ impl MixNode { impl Node for MixNode { type Settings = MixnodeSettings; - type State = MixNodeState; + type State = MixnodeState; fn id(&self) -> NodeId { self.id @@ -65,6 +68,7 @@ impl Node for MixNode { println!(">>>>> Node {}, message: {message:?}", self.id); } + self.state.step_id += 1; self.state.mock_counter += 1; for node_id in self.settings.connected_peers.iter() { diff --git a/network-runner/src/node/mix/state.rs b/network-runner/src/node/mix/state.rs new file mode 100644 index 0000000..10ce58f --- /dev/null +++ b/network-runner/src/node/mix/state.rs @@ -0,0 +1,73 @@ +use std::any::Any; + +use serde::Serialize; + +use crate::{ + node::NodeId, + output_processors::{Record, RecordType, Runtime}, + settings::SimulationSettings, + warding::SimulationState, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct MixnodeState { + pub node_id: NodeId, + pub mock_counter: usize, + pub step_id: usize, +} + +#[derive(Serialize)] +#[serde(untagged)] +pub enum MixnodeRecord { + Runtime(Runtime), + Settings(Box), + Data(Vec>), +} + +impl From for MixnodeRecord { + fn from(value: Runtime) -> Self { + Self::Runtime(value) + } +} + +impl From for MixnodeRecord { + fn from(value: SimulationSettings) -> Self { + Self::Settings(Box::new(value)) + } +} + +impl Record for MixnodeRecord { + type Data = MixnodeState; + + fn record_type(&self) -> RecordType { + match self { + MixnodeRecord::Runtime(_) => RecordType::Meta, + MixnodeRecord::Settings(_) => RecordType::Settings, + MixnodeRecord::Data(_) => RecordType::Data, + } + } + + fn data(&self) -> Vec<&MixnodeState> { + match self { + MixnodeRecord::Data(d) => d.iter().map(AsRef::as_ref).collect(), + _ => vec![], + } + } +} + +impl TryFrom<&SimulationState> for MixnodeRecord { + type Error = anyhow::Error; + + fn try_from(state: &SimulationState) -> Result { + let Ok(states) = state + .nodes + .read() + .iter() + .map(|n| Box::::downcast(Box::new(n.state().clone()))) + .collect::, _>>() + else { + return Err(anyhow::anyhow!("use carnot record on other node")); + }; + Ok(Self::Data(states)) + } +} diff --git a/network-runner/src/streaming/naive.rs b/network-runner/src/streaming/naive.rs index 6b118ce..d99deab 100644 --- a/network-runner/src/streaming/naive.rs +++ b/network-runner/src/streaming/naive.rs @@ -16,7 +16,7 @@ use std::{ #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NaiveSettings { pub path: PathBuf, - #[serde(default = "SubscriberFormat::csv")] + #[serde(default = "SubscriberFormat::json")] pub format: SubscriberFormat, } @@ -191,121 +191,3 @@ fn write_csv_record( } Ok(()) } - -#[cfg(test)] -mod tests { - use std::{collections::HashMap, time::Duration}; - - use consensus_engine::View; - - use crate::{ - network::{ - behaviour::NetworkBehaviour, - regions::{Region, RegionsData}, - Network, NetworkBehaviourKey, - }, - node::{ - dummy_streaming::{DummyStreamingNode, DummyStreamingState}, - Node, NodeId, NodeIdExt, - }, - output_processors::OutData, - runner::SimulationRunner, - warding::SimulationState, - }; - - use super::*; - #[derive(Debug, Clone, Serialize)] - struct NaiveRecord { - states: HashMap, - } - - impl TryFrom<&SimulationState> for NaiveRecord { - type Error = anyhow::Error; - - fn try_from(value: &SimulationState) -> Result { - Ok(Self { - states: value - .nodes - .read() - .iter() - .map(|node| (node.id(), node.current_view())) - .collect(), - }) - } - } - - #[test] - fn test_streaming() { - let simulation_settings = crate::settings::SimulationSettings { - seed: Some(1), - ..Default::default() - }; - - let nodes = (0..6) - .map(|idx| { - Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ())) - as Box< - dyn Node - + std::marker::Send - + Sync, - > - }) - .collect::>(); - let network = Network::new( - RegionsData { - regions: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (region, vec![NodeId::from_index(idx)]) - }) - .collect(), - node_region: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - (NodeId::from_index(idx), region) - }) - .collect(), - region_network_behaviour: (0..6) - .map(|idx| { - let region = match idx % 6 { - 0 => Region::Europe, - 1 => Region::NorthAmerica, - 2 => Region::SouthAmerica, - 3 => Region::Asia, - 4 => Region::Africa, - 5 => Region::Australia, - _ => unreachable!(), - }; - ( - NetworkBehaviourKey::new(region, region), - NetworkBehaviour { - delay: Duration::from_millis(100), - drop: 0.0, - }, - ) - }) - .collect(), - }, - 0, - ); - let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> = - SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap(); - simulation_runner.simulate().unwrap(); - } -}