Record fields for mixnode state (#17)

This commit is contained in:
gusto 2024-11-06 06:14:29 +02:00 committed by GitHub
parent 20f23f09ea
commit 32d41d45b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 101 additions and 139 deletions

View File

@ -23,7 +23,7 @@
"step_time": "10ms",
"runner_settings": "Sync",
"stream_settings": {
"path": "test.csv"
"path": "test.json"
},
"node_count": 3,
"seed": 0,

View File

@ -10,11 +10,10 @@ use crossbeam::channel;
use nomos_simulations_network_runner::network::behaviour::create_behaviours;
use nomos_simulations_network_runner::network::regions::{create_regions, RegionsData};
use nomos_simulations_network_runner::network::{InMemoryNetworkInterface, Network};
use nomos_simulations_network_runner::node::mix::{
MixMessage, MixNode, MixNodeState, MixnodeSettings,
};
use nomos_simulations_network_runner::node::mix::state::{MixnodeRecord, MixnodeState};
use nomos_simulations_network_runner::node::mix::{MixMessage, MixNode, MixnodeSettings};
use nomos_simulations_network_runner::node::{NodeId, NodeIdExt};
use nomos_simulations_network_runner::output_processors::{OutData, Record};
use nomos_simulations_network_runner::output_processors::Record;
use nomos_simulations_network_runner::runner::{BoxedNode, SimulationRunnerHandle};
#[cfg(feature = "polars")]
use nomos_simulations_network_runner::streaming::polars::PolarsSubscriber;
@ -113,7 +112,7 @@ fn create_boxed_mixnode(
simulation_settings: SimulationSettings,
no_netcap: bool,
mixnode_settings: MixnodeSettings,
) -> BoxedNode<MixnodeSettings, MixNodeState> {
) -> BoxedNode<MixnodeSettings, MixnodeState> {
let (node_message_broadcast_sender, node_message_broadcast_receiver) = channel::unbounded();
let (node_message_sender, node_message_receiver) = channel::unbounded();
// Dividing milliseconds in second by milliseconds in the step.
@ -156,22 +155,26 @@ where
T: Serialize + Clone + 'static,
{
let stream_settings = settings.stream_settings.clone();
let runner =
SimulationRunner::<_, OutData, S, T>::new(network, nodes, Default::default(), settings)?;
let runner = SimulationRunner::<_, MixnodeRecord, S, T>::new(
network,
nodes,
Default::default(),
settings,
)?;
let handle = match stream_type {
Some(StreamType::Naive) => {
let settings = stream_settings.unwrap_naive();
runner.simulate_and_subscribe::<NaiveSubscriber<OutData>>(settings)?
runner.simulate_and_subscribe::<NaiveSubscriber<MixnodeRecord>>(settings)?
}
Some(StreamType::IO) => {
let settings = stream_settings.unwrap_io();
runner.simulate_and_subscribe::<IOSubscriber<OutData>>(settings)?
runner.simulate_and_subscribe::<IOSubscriber<MixnodeRecord>>(settings)?
}
#[cfg(feature = "polars")]
Some(StreamType::Polars) => {
let settings = stream_settings.unwrap_polars();
runner.simulate_and_subscribe::<PolarsSubscriber<OutData>>(settings)?
runner.simulate_and_subscribe::<PolarsSubscriber<MixnodeRecord>>(settings)?
}
None => runner.simulate()?,
};

View File

@ -1,13 +1,11 @@
pub mod state;
use super::{Node, NodeId};
use crate::network::{InMemoryNetworkInterface, NetworkInterface, PayloadSize};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use state::MixnodeState;
use std::time::Duration;
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct MixNodeState {
pub mock_counter: usize,
}
#[derive(Debug, Clone)]
pub enum MixMessage {
Dummy(String),
@ -19,6 +17,7 @@ impl PayloadSize for MixMessage {
}
}
#[derive(Clone, Default, Deserialize)]
pub struct MixnodeSettings {
pub connected_peers: Vec<NodeId>,
}
@ -26,7 +25,7 @@ pub struct MixnodeSettings {
/// This node implementation only used for testing different streaming implementation purposes.
pub struct MixNode {
id: NodeId,
state: MixNodeState,
state: MixnodeState,
settings: MixnodeSettings,
network_interface: InMemoryNetworkInterface<MixMessage>,
}
@ -41,7 +40,11 @@ impl MixNode {
id,
network_interface,
settings,
state: MixNodeState::default(),
state: MixnodeState {
node_id: id,
mock_counter: 0,
step_id: 0,
},
}
}
}
@ -49,7 +52,7 @@ impl MixNode {
impl Node for MixNode {
type Settings = MixnodeSettings;
type State = MixNodeState;
type State = MixnodeState;
fn id(&self) -> NodeId {
self.id
@ -65,6 +68,7 @@ impl Node for MixNode {
println!(">>>>> Node {}, message: {message:?}", self.id);
}
self.state.step_id += 1;
self.state.mock_counter += 1;
for node_id in self.settings.connected_peers.iter() {

View File

@ -0,0 +1,73 @@
use std::any::Any;
use serde::Serialize;
use crate::{
node::NodeId,
output_processors::{Record, RecordType, Runtime},
settings::SimulationSettings,
warding::SimulationState,
};
#[derive(Debug, Clone, Serialize)]
pub struct MixnodeState {
pub node_id: NodeId,
pub mock_counter: usize,
pub step_id: usize,
}
#[derive(Serialize)]
#[serde(untagged)]
pub enum MixnodeRecord {
Runtime(Runtime),
Settings(Box<SimulationSettings>),
Data(Vec<Box<MixnodeState>>),
}
impl From<Runtime> for MixnodeRecord {
fn from(value: Runtime) -> Self {
Self::Runtime(value)
}
}
impl From<SimulationSettings> for MixnodeRecord {
fn from(value: SimulationSettings) -> Self {
Self::Settings(Box::new(value))
}
}
impl Record for MixnodeRecord {
type Data = MixnodeState;
fn record_type(&self) -> RecordType {
match self {
MixnodeRecord::Runtime(_) => RecordType::Meta,
MixnodeRecord::Settings(_) => RecordType::Settings,
MixnodeRecord::Data(_) => RecordType::Data,
}
}
fn data(&self) -> Vec<&MixnodeState> {
match self {
MixnodeRecord::Data(d) => d.iter().map(AsRef::as_ref).collect(),
_ => vec![],
}
}
}
impl<S, T: Clone + Serialize + 'static> TryFrom<&SimulationState<S, T>> for MixnodeRecord {
type Error = anyhow::Error;
fn try_from(state: &SimulationState<S, T>) -> Result<Self, Self::Error> {
let Ok(states) = state
.nodes
.read()
.iter()
.map(|n| Box::<dyn Any + 'static>::downcast(Box::new(n.state().clone())))
.collect::<Result<Vec<_>, _>>()
else {
return Err(anyhow::anyhow!("use carnot record on other node"));
};
Ok(Self::Data(states))
}
}

View File

@ -16,7 +16,7 @@ use std::{
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NaiveSettings {
pub path: PathBuf,
#[serde(default = "SubscriberFormat::csv")]
#[serde(default = "SubscriberFormat::json")]
pub format: SubscriberFormat,
}
@ -191,121 +191,3 @@ fn write_csv_record<W: std::io::Write, R: Record>(
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, time::Duration};
use consensus_engine::View;
use crate::{
network::{
behaviour::NetworkBehaviour,
regions::{Region, RegionsData},
Network, NetworkBehaviourKey,
},
node::{
dummy_streaming::{DummyStreamingNode, DummyStreamingState},
Node, NodeId, NodeIdExt,
},
output_processors::OutData,
runner::SimulationRunner,
warding::SimulationState,
};
use super::*;
#[derive(Debug, Clone, Serialize)]
struct NaiveRecord {
states: HashMap<NodeId, View>,
}
impl<S, T: Serialize> TryFrom<&SimulationState<S, T>> for NaiveRecord {
type Error = anyhow::Error;
fn try_from(value: &SimulationState<S, T>) -> Result<Self, Self::Error> {
Ok(Self {
states: value
.nodes
.read()
.iter()
.map(|node| (node.id(), node.current_view()))
.collect(),
})
}
}
#[test]
fn test_streaming() {
let simulation_settings = crate::settings::SimulationSettings {
seed: Some(1),
..Default::default()
};
let nodes = (0..6)
.map(|idx| {
Box::new(DummyStreamingNode::new(NodeId::from_index(idx), ()))
as Box<
dyn Node<State = DummyStreamingState, Settings = ()>
+ std::marker::Send
+ Sync,
>
})
.collect::<Vec<_>>();
let network = Network::new(
RegionsData {
regions: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(region, vec![NodeId::from_index(idx)])
})
.collect(),
node_region: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(NodeId::from_index(idx), region)
})
.collect(),
region_network_behaviour: (0..6)
.map(|idx| {
let region = match idx % 6 {
0 => Region::Europe,
1 => Region::NorthAmerica,
2 => Region::SouthAmerica,
3 => Region::Asia,
4 => Region::Africa,
5 => Region::Australia,
_ => unreachable!(),
};
(
NetworkBehaviourKey::new(region, region),
NetworkBehaviour {
delay: Duration::from_millis(100),
drop: 0.0,
},
)
})
.collect(),
},
0,
);
let simulation_runner: SimulationRunner<(), OutData, (), DummyStreamingState> =
SimulationRunner::new(network, nodes, Default::default(), simulation_settings).unwrap();
simulation_runner.simulate().unwrap();
}
}