diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index ab3cb407..e286d1eb 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -921,6 +921,7 @@ mod tests { }; let serialized = serde_json::to_string(&info).unwrap(); + eprintln!("{serialized}"); assert_eq!( serialized, r#"{"id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"current_view":1,"highest_voted_view":-1,"local_high_qc":{"view":0,"id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"safe_blocks":[[[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],{"view":0,"parent_qc":{"Standard":{"view":0,"id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"leader_proof":{"LeaderId":{"leader_id":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}}}]],"last_view_timeout_qc":null,"committed_blocks":[[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]]}"# diff --git a/simulations/src/node/carnot/mod.rs b/simulations/src/node/carnot/mod.rs index 3bd7032b..551ff3a4 100644 --- a/simulations/src/node/carnot/mod.rs +++ b/simulations/src/node/carnot/mod.rs @@ -3,6 +3,7 @@ mod event_builder; mod message_cache; pub mod messages; +mod serde_util; mod tally; mod timeout; @@ -13,7 +14,7 @@ use std::{collections::HashMap, time::Duration}; // crates use bls_signatures::PrivateKey; use rand::Rng; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; // internal use self::messages::CarnotMessage; use super::{Node, NodeId}; @@ -85,7 +86,6 @@ impl serde::Serialize for CarnotState { where S: serde::Serializer, { - use serde::ser::SerializeStruct; if let Some(rs) = RECORD_SETTINGS.get() { let keys = rs .iter() @@ -98,53 +98,54 @@ impl serde::Serialize for CarnotState { }) .collect::>(); - let mut ser = serializer.serialize_struct("CarnotState", keys.len())?; + let mut state = serde_util::CarnotState::default(); for k in keys { match k.trim() { - NODE_ID => ser.serialize_field(NODE_ID, &self.node_id)?, - CURRENT_VIEW => ser.serialize_field(CURRENT_VIEW, &self.current_view)?, - HIGHEST_VOTED_VIEW => { - ser.serialize_field(HIGHEST_VOTED_VIEW, &self.highest_voted_view)? + NODE_ID => { + state.node_id = Some(self.node_id.into()); + } + CURRENT_VIEW => { + state.current_view = Some(self.current_view); + } + HIGHEST_VOTED_VIEW => { + state.highest_voted_view = Some(self.highest_voted_view); + } + LOCAL_HIGH_QC => { + state.local_high_qc = Some((&self.local_high_qc).into()); } - LOCAL_HIGH_QC => ser.serialize_field(LOCAL_HIGH_QC, &self.local_high_qc)?, SAFE_BLOCKS => { - #[derive(Serialize)] - #[serde(transparent)] - struct SafeBlockHelper<'a> { - #[serde(serialize_with = "serialize_blocks")] - safe_blocks: &'a HashMap, - } - ser.serialize_field( - SAFE_BLOCKS, - &SafeBlockHelper { - safe_blocks: &self.safe_blocks, - }, - )?; + state.safe_blocks = Some((&self.safe_blocks).into()); } LAST_VIEW_TIMEOUT_QC => { - ser.serialize_field(LAST_VIEW_TIMEOUT_QC, &self.last_view_timeout_qc)? + state.last_view_timeout_qc = + Some(self.last_view_timeout_qc.as_ref().map(From::from)); } LATEST_COMMITTED_BLOCK => { - ser.serialize_field(LATEST_COMMITTED_BLOCK, &self.latest_committed_block)? + state.latest_committed_block = Some((&self.latest_committed_block).into()); } LATEST_COMMITTED_VIEW => { - ser.serialize_field(LATEST_COMMITTED_VIEW, &self.latest_committed_view)? + state.latest_committed_view = Some(self.latest_committed_view); + } + ROOT_COMMITTEE => { + state.root_committee = Some((&self.root_committee).into()); } - ROOT_COMMITTEE => ser.serialize_field(ROOT_COMMITTEE, &self.root_committee)?, PARENT_COMMITTEE => { - ser.serialize_field(PARENT_COMMITTEE, &self.parent_committee)? + state.parent_committee = + Some(self.parent_committee.as_ref().map(From::from)); } CHILD_COMMITTEES => { - ser.serialize_field(CHILD_COMMITTEES, &self.child_committees)? + state.child_committees = Some(self.child_committees.as_slice().into()); } COMMITTED_BLOCKS => { - ser.serialize_field(COMMITTED_BLOCKS, &self.committed_blocks)? + state.committed_blocks = Some(self.committed_blocks.as_slice().into()); + } + STEP_DURATION => { + state.step_duration = Some(self.step_duration); } - STEP_DURATION => ser.serialize_field(STEP_DURATION, &self.step_duration)?, _ => {} } } - ser.end() + state.serialize(serializer) } else { serializer.serialize_none() } diff --git a/simulations/src/node/carnot/serde_util.rs b/simulations/src/node/carnot/serde_util.rs new file mode 100644 index 00000000..8b5cf654 --- /dev/null +++ b/simulations/src/node/carnot/serde_util.rs @@ -0,0 +1,377 @@ +use std::{collections::HashMap, time::Duration}; + +use serde::{ + ser::{SerializeSeq, Serializer}, + Deserialize, Serialize, +}; + +use self::{ + serde_block::BlockHelper, + serde_id::{BlockIdHelper, NodeIdHelper}, + standard_qc::StandardQcHelper, + timeout_qc::TimeoutQcHelper, +}; +use consensus_engine::{AggregateQc, Block, BlockId, Committee, Qc, StandardQc, TimeoutQc, View}; + +#[serde_with::skip_serializing_none] +#[serde_with::serde_as] +#[derive(Serialize, Default)] +pub(crate) struct CarnotState<'a> { + pub(crate) node_id: Option, + pub(crate) current_view: Option, + pub(crate) highest_voted_view: Option, + pub(crate) local_high_qc: Option, + pub(crate) safe_blocks: Option>, + pub(crate) last_view_timeout_qc: Option>>, + pub(crate) latest_committed_block: Option, + pub(crate) latest_committed_view: Option, + pub(crate) root_committee: Option>, + pub(crate) parent_committee: Option>>, + pub(crate) child_committees: Option>, + pub(crate) committed_blocks: Option>, + #[serde_as(as = "Option")] + pub(crate) step_duration: Option, +} + +impl<'a> From<&'a super::CarnotState> for CarnotState<'a> { + fn from(value: &'a super::CarnotState) -> Self { + Self { + node_id: Some(value.node_id.into()), + current_view: Some(value.current_view), + highest_voted_view: Some(value.highest_voted_view), + local_high_qc: Some(StandardQcHelper::from(&value.local_high_qc)), + safe_blocks: Some(SafeBlocksHelper::from(&value.safe_blocks)), + last_view_timeout_qc: Some(value.last_view_timeout_qc.as_ref().map(From::from)), + latest_committed_block: Some(BlockHelper::from(&value.latest_committed_block)), + latest_committed_view: Some(value.latest_committed_view), + root_committee: Some(CommitteeHelper::from(&value.root_committee)), + parent_committee: Some(value.parent_committee.as_ref().map(From::from)), + child_committees: Some(CommitteesHelper::from(value.child_committees.as_slice())), + committed_blocks: Some(CommittedBlockHelper::from( + value.committed_blocks.as_slice(), + )), + step_duration: Some(value.step_duration), + } + } +} + +pub(crate) struct SafeBlocksHelper<'a>(&'a HashMap); + +impl<'a> From<&'a HashMap> for SafeBlocksHelper<'a> { + fn from(val: &'a HashMap) -> Self { + Self(val) + } +} + +impl<'a> Serialize for SafeBlocksHelper<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let iter = self.0.values(); + let mut s = serializer.serialize_seq(Some(iter.size_hint().0))?; + for b in iter { + s.serialize_element(&BlockHelper::from(b))?; + } + s.end() + } +} + +pub(crate) struct CommitteeHelper<'a>(&'a Committee); + +impl<'a> From<&'a Committee> for CommitteeHelper<'a> { + fn from(val: &'a Committee) -> Self { + Self(val) + } +} + +impl<'a> Serialize for CommitteeHelper<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let iter = self.0.iter(); + let mut s = serializer.serialize_seq(Some(iter.size_hint().0))?; + for id in iter { + s.serialize_element(&NodeIdHelper::from(*id))?; + } + s.end() + } +} + +pub(crate) struct CommitteesHelper<'a>(&'a [Committee]); + +impl<'a> From<&'a [Committee]> for CommitteesHelper<'a> { + fn from(val: &'a [Committee]) -> Self { + Self(val) + } +} + +impl<'a> Serialize for CommitteesHelper<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut s = serializer.serialize_seq(Some(self.0.len()))?; + for c in self.0 { + s.serialize_element(&CommitteeHelper::from(c))?; + } + s.end() + } +} + +pub(crate) struct CommittedBlockHelper<'a>(&'a [BlockId]); + +impl<'a> From<&'a [BlockId]> for CommittedBlockHelper<'a> { + fn from(val: &'a [BlockId]) -> Self { + Self(val) + } +} + +impl<'a> Serialize for CommittedBlockHelper<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut s = serializer.serialize_seq(Some(self.0.len()))?; + for c in self.0 { + s.serialize_element(&BlockIdHelper::from(*c))?; + } + s.end() + } +} + +pub(crate) mod standard_qc { + use super::*; + + #[derive(Serialize)] + pub(crate) struct StandardQcHelper { + view: View, + id: serde_id::BlockIdHelper, + } + + impl From<&StandardQc> for StandardQcHelper { + fn from(val: &StandardQc) -> Self { + Self { + view: val.view, + id: val.id.into(), + } + } + } + + pub fn serialize(t: &StandardQc, serializer: S) -> Result { + StandardQcHelper::from(t).serialize(serializer) + } +} + +pub(crate) mod aggregate_qc { + use super::*; + + #[derive(Serialize)] + pub(crate) struct AggregateQcHelper<'a> { + #[serde(serialize_with = "standard_qc::serialize")] + high_qc: &'a StandardQc, + view: View, + } + + impl<'a> From<&'a AggregateQc> for AggregateQcHelper<'a> { + fn from(t: &'a AggregateQc) -> Self { + Self { + high_qc: &t.high_qc, + view: t.view, + } + } + } + + pub fn serialize( + t: &AggregateQc, + serializer: S, + ) -> Result { + AggregateQcHelper::from(t).serialize(serializer) + } +} + +pub(crate) mod qc { + use super::*; + + #[derive(Serialize)] + #[serde(untagged)] + pub(crate) enum QcHelper<'a> { + Standard(#[serde(serialize_with = "standard_qc::serialize")] &'a StandardQc), + Aggregate(aggregate_qc::AggregateQcHelper<'a>), + } + + pub fn serialize(t: &Qc, serializer: S) -> Result { + let qc = match t { + Qc::Standard(s) => QcHelper::Standard(s), + Qc::Aggregated(a) => QcHelper::Aggregate(aggregate_qc::AggregateQcHelper::from(a)), + }; + qc.serialize(serializer) + } +} + +pub(crate) mod timeout_qc { + use super::*; + use consensus_engine::NodeId; + + #[derive(Serialize)] + pub(crate) struct TimeoutQcHelper<'a> { + view: View, + #[serde(serialize_with = "standard_qc::serialize")] + high_qc: &'a StandardQc, + #[serde(serialize_with = "serde_id::serialize_node_id")] + sender: NodeId, + } + + impl<'a> From<&'a TimeoutQc> for TimeoutQcHelper<'a> { + fn from(value: &'a TimeoutQc) -> Self { + Self { + view: value.view(), + high_qc: value.high_qc(), + sender: value.sender(), + } + } + } + + pub fn serialize( + t: &TimeoutQc, + serializer: S, + ) -> Result { + TimeoutQcHelper::from(t).serialize(serializer) + } +} + +pub(crate) mod serde_block { + use super::*; + + #[derive(Serialize)] + pub(crate) struct BlockHelper { + view: View, + id: BlockIdHelper, + } + + impl From<&Block> for BlockHelper { + fn from(val: &Block) -> Self { + Self { + view: val.view, + id: val.id.into(), + } + } + } + + pub fn serialize(t: &Block, serializer: S) -> Result { + BlockHelper::from(t).serialize(serializer) + } +} + +pub(crate) mod serde_id { + use consensus_engine::{BlockId, NodeId}; + + use super::*; + + #[derive(Serialize, Deserialize)] + pub(crate) struct BlockIdHelper(#[serde(with = "serde_array32")] [u8; 32]); + + impl From for BlockIdHelper { + fn from(val: BlockId) -> Self { + Self(val.into()) + } + } + + #[derive(Serialize, Deserialize)] + pub(crate) struct NodeIdHelper(#[serde(with = "serde_array32")] [u8; 32]); + + impl From for NodeIdHelper { + fn from(val: NodeId) -> Self { + Self(val.into()) + } + } + + pub fn serialize_node_id( + t: &NodeId, + serializer: S, + ) -> Result { + NodeIdHelper::from(*t).serialize(serializer) + } + + pub(crate) mod serde_array32 { + use super::*; + use std::cell::RefCell; + + const MAX_SERIALIZATION_LENGTH: usize = 32 * 2 + 2; + + thread_local! { + static STRING_BUFFER: RefCell = RefCell::new(String::with_capacity(MAX_SERIALIZATION_LENGTH)); + } + + pub fn serialize( + t: &[u8; 32], + serializer: S, + ) -> Result { + if serializer.is_human_readable() { + STRING_BUFFER.with(|s| { + let mut s = s.borrow_mut(); + s.clear(); + s.push_str("0x"); + for v in t { + std::fmt::write(&mut *s, format_args!("{:02x}", v)).unwrap(); + } + s.serialize(serializer) + }) + } else { + t.serialize(serializer) + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result<[u8; 32], D::Error> + where + D: serde::Deserializer<'de>, + { + if deserializer.is_human_readable() { + <&str>::deserialize(deserializer).and_then(|s| { + super::parse_hex_from_str::<32>(s) + .map_err(::custom) + }) + } else { + let x = <&[u8]>::deserialize(deserializer)?; + <[u8; 32]>::try_from(x).map_err(::custom) + } + } + } + + #[derive(Debug, thiserror::Error)] + enum DecodeError { + #[error("expected str of length {expected} but got length {actual}")] + UnexpectedSize { expected: usize, actual: usize }, + #[error("invalid character pair '{0}{1}'")] + InvalidCharacter(char, char), + } + + fn parse_hex_from_str(mut s: &str) -> Result<[u8; N], DecodeError> { + // check if we start with 0x or not + let prefix_len = if s.starts_with("0x") || s.starts_with("0X") { + s = &s[2..]; + 2 + } else { + 0 + }; + + if s.len() != N * 2 { + return Err(DecodeError::UnexpectedSize { + expected: N * 2 + prefix_len, + actual: s.len(), + }); + } + + let mut output = [0; N]; + + for (chars, byte) in s.as_bytes().chunks_exact(2).zip(output.iter_mut()) { + let (l, r) = (chars[0] as char, chars[1] as char); + match (l.to_digit(16), r.to_digit(16)) { + (Some(l), Some(r)) => *byte = (l as u8) << 4 | r as u8, + (_, _) => return Err(DecodeError::InvalidCharacter(l, r)), + }; + } + Ok(output) + } +} diff --git a/tests/Cargo.toml b/tests/Cargo.toml index de28f80d..cd3d964c 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -39,5 +39,5 @@ path = "src/tests/unhappy.rs" [features] metrics = ["nomos-node/metrics"] -waku = ["nomos-network/waku", "nomos-mempool/waku", "waku-bindings"] -libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p"] \ No newline at end of file +waku = ["nomos-network/waku", "nomos-mempool/waku", "nomos-node/waku", "waku-bindings"] +libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p", "nomos-node/libp2p"] \ No newline at end of file