Merge branch 'master' into reestablish-conn

This commit is contained in:
Al Liu 2023-10-17 15:04:23 +08:00 committed by GitHub
commit 5fb6b968a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 322 additions and 129 deletions

View File

@ -21,11 +21,32 @@ pub struct Layer {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Node {
#[serde(with = "addr_serde")]
pub address: SocketAddr,
#[serde(with = "hex_serde")]
pub public_key: [u8; PUBLIC_KEY_SIZE],
}
mod addr_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::net::{SocketAddr, ToSocketAddrs};
pub fn serialize<S: Serializer>(addr: &SocketAddr, serializer: S) -> Result<S::Ok, S::Error> {
addr.to_string().serialize(serializer)
}
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<SocketAddr, D::Error> {
let s = String::deserialize(deserializer)?;
// Try to convert the string (which might be a domain name) to a SocketAddr.
let mut addrs = s.to_socket_addrs().map_err(serde::de::Error::custom)?;
addrs
.next()
.ok_or_else(|| serde::de::Error::custom("Failed to resolve to a valid address"))
}
}
mod hex_serde {
use super::PUBLIC_KEY_SIZE;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

View File

@ -9,8 +9,8 @@ mixnet-node = { path = "../../mixnet/node" }
nomos-log = { path = "../../nomos-services/log" }
clap = { version = "4", features = ["derive"] }
color-eyre = "0.6.0"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
serde = "1"
serde_yaml = "0.9"
tracing = "0.1"

View File

@ -14,8 +14,8 @@ chrono = "0.4"
futures = "0.3"
http = "0.2.9"
hex = "0.4.3"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }

View File

@ -15,8 +15,8 @@ clap = {version = "4", features = ["derive"] }
serde_yaml = "0.9"
futures = "0.3"
tokio = { version = "1", features = ["sync"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
nomos-libp2p = { path = "../nomos-libp2p"}

View File

@ -7,8 +7,8 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
tracing = "0.1"
[dev-dependencies]

188
nomos-libp2p/src/config.rs Normal file
View File

@ -0,0 +1,188 @@
use std::time::Duration;
use libp2p::{gossipsub, identity::secp256k1};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmConfig {
// Listening IPv4 address
pub host: std::net::Ipv4Addr,
// TCP listening port. Use 0 for random
pub port: u16,
// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")]
pub node_key: secp256k1::SecretKey,
// Gossipsub config
#[serde(with = "GossipsubConfigDef")]
pub gossipsub_config: gossipsub::Config,
}
impl Default for SwarmConfig {
fn default() -> Self {
Self {
host: std::net::Ipv4Addr::new(0, 0, 0, 0),
port: 60000,
node_key: secp256k1::SecretKey::generate(),
gossipsub_config: gossipsub::Config::default(),
}
}
}
// A partial copy of gossipsub::Config for deriving Serialize/Deserialize remotely
// https://serde.rs/remote-derive.html
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(remote = "gossipsub::Config")]
struct GossipsubConfigDef {
#[serde(getter = "gossipsub::Config::history_length")]
history_legnth: usize,
#[serde(getter = "gossipsub::Config::history_gossip")]
history_gossip: usize,
#[serde(getter = "gossipsub::Config::mesh_n")]
mesh_n: usize,
#[serde(getter = "gossipsub::Config::mesh_n_low")]
mesh_n_low: usize,
#[serde(getter = "gossipsub::Config::mesh_n_high")]
mesh_n_high: usize,
#[serde(getter = "gossipsub::Config::retain_scores")]
retain_scores: usize,
#[serde(getter = "gossipsub::Config::gossip_lazy")]
gossip_lazy: usize,
#[serde(getter = "gossipsub::Config::gossip_factor")]
gossip_factor: f64,
#[serde(getter = "gossipsub::Config::heartbeat_initial_delay")]
heartbeat_initial_delay: Duration,
#[serde(getter = "gossipsub::Config::heartbeat_interval")]
heartbeat_interval: Duration,
#[serde(getter = "gossipsub::Config::fanout_ttl")]
fanout_ttl: Duration,
#[serde(getter = "gossipsub::Config::check_explicit_peers_ticks")]
check_explicit_peers_ticks: u64,
#[serde(getter = "gossipsub::Config::idle_timeout")]
idle_timeout: Duration,
#[serde(getter = "gossipsub::Config::duplicate_cache_time")]
duplicate_cache_time: Duration,
#[serde(getter = "gossipsub::Config::validate_messages")]
validate_messages: bool,
#[serde(getter = "gossipsub::Config::allow_self_origin")]
allow_self_origin: bool,
#[serde(getter = "gossipsub::Config::do_px")]
do_px: bool,
#[serde(getter = "gossipsub::Config::prune_peers")]
prune_peers: usize,
#[serde(getter = "gossipsub::Config::prune_backoff")]
prune_backoff: Duration,
#[serde(getter = "gossipsub::Config::unsubscribe_backoff")]
unsubscribe_backoff: Duration,
#[serde(getter = "gossipsub::Config::backoff_slack")]
backoff_slack: u32,
#[serde(getter = "gossipsub::Config::flood_publish")]
flood_publish: bool,
#[serde(getter = "gossipsub::Config::graft_flood_threshold")]
graft_flood_threshold: Duration,
#[serde(getter = "gossipsub::Config::mesh_outbound_min")]
mesh_outbound_min: usize,
#[serde(getter = "gossipsub::Config::opportunistic_graft_ticks")]
opportunistic_graft_ticks: u64,
#[serde(getter = "gossipsub::Config::opportunistic_graft_peers")]
opportunistic_graft_peers: usize,
#[serde(getter = "gossipsub::Config::gossip_retransimission")]
gossip_retransimission: u32,
#[serde(getter = "gossipsub::Config::max_messages_per_rpc")]
max_messages_per_rpc: Option<usize>,
#[serde(getter = "gossipsub::Config::max_ihave_length")]
max_ihave_length: usize,
#[serde(getter = "gossipsub::Config::max_ihave_messages")]
max_ihave_messages: usize,
#[serde(getter = "gossipsub::Config::iwant_followup_time")]
iwant_followup_time: Duration,
#[serde(getter = "gossipsub::Config::published_message_ids_cache_time")]
published_message_ids_cache_time: Duration,
}
impl From<GossipsubConfigDef> for gossipsub::Config {
fn from(def: GossipsubConfigDef) -> gossipsub::Config {
let mut builder = gossipsub::ConfigBuilder::default();
let mut builder = builder
.history_length(def.history_legnth)
.history_gossip(def.history_gossip)
.mesh_n(def.mesh_n)
.mesh_n_low(def.mesh_n_low)
.mesh_n_high(def.mesh_n_high)
.retain_scores(def.retain_scores)
.gossip_lazy(def.gossip_lazy)
.gossip_factor(def.gossip_factor)
.heartbeat_initial_delay(def.heartbeat_initial_delay)
.heartbeat_interval(def.heartbeat_interval)
.fanout_ttl(def.fanout_ttl)
.check_explicit_peers_ticks(def.check_explicit_peers_ticks)
.idle_timeout(def.idle_timeout)
.duplicate_cache_time(def.duplicate_cache_time)
.allow_self_origin(def.allow_self_origin)
.prune_peers(def.prune_peers)
.prune_backoff(def.prune_backoff)
.unsubscribe_backoff(def.unsubscribe_backoff.as_secs())
.backoff_slack(def.backoff_slack)
.flood_publish(def.flood_publish)
.graft_flood_threshold(def.graft_flood_threshold)
.mesh_outbound_min(def.mesh_outbound_min)
.opportunistic_graft_ticks(def.opportunistic_graft_ticks)
.opportunistic_graft_peers(def.opportunistic_graft_peers)
.gossip_retransimission(def.gossip_retransimission)
.max_messages_per_rpc(def.max_messages_per_rpc)
.max_ihave_length(def.max_ihave_length)
.max_ihave_messages(def.max_ihave_messages)
.iwant_followup_time(def.iwant_followup_time)
.published_message_ids_cache_time(def.published_message_ids_cache_time);
if def.validate_messages {
builder = builder.validate_messages();
}
if def.do_px {
builder = builder.do_px();
}
builder.build().unwrap()
}
}
mod secret_key_serde {
use libp2p::identity::secp256k1;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(key: &secp256k1::SecretKey, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let hex_str = hex::encode(key.to_bytes());
hex_str.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<secp256k1::SecretKey, D::Error>
where
D: Deserializer<'de>,
{
let hex_str = String::deserialize(deserializer)?;
let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?;
secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice())
.map_err(|e| D::Error::custom(format!("{e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_serde() {
let config: SwarmConfig = Default::default();
let serialized = serde_json::to_string(&config).unwrap();
println!("{serialized}");
let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap();
assert_eq!(deserialized.host, config.host);
assert_eq!(deserialized.port, config.port);
assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes());
}
}

View File

@ -1,9 +1,12 @@
mod config;
use std::error::Error;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub use config::SwarmConfig;
pub use libp2p;
use blake2::digest::{consts::U32, Digest};
@ -23,7 +26,6 @@ pub use libp2p::{
};
use libp2p::{swarm::ConnectionId, tcp::tokio::Tcp};
pub use multiaddr::{multiaddr, Multiaddr, Protocol};
use serde::{Deserialize, Serialize};
/// Wraps [`libp2p::Swarm`], and config it for use within Nomos.
pub struct Swarm {
@ -36,27 +38,6 @@ pub struct Behaviour {
gossipsub: gossipsub::Behaviour,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmConfig {
// Listening IPv4 address
pub host: std::net::Ipv4Addr,
// TCP listening port. Use 0 for random
pub port: u16,
// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde", default = "secp256k1::SecretKey::generate")]
pub node_key: secp256k1::SecretKey,
}
impl Default for SwarmConfig {
fn default() -> Self {
Self {
host: std::net::Ipv4Addr::new(0, 0, 0, 0),
port: 60000,
node_key: secp256k1::SecretKey::generate(),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum SwarmError {
#[error("duplicate dialing")]
@ -93,7 +74,7 @@ impl Swarm {
// to prevent all messages from a peer being filtered as duplicates.
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Author(local_peer_id),
gossipsub::ConfigBuilder::default()
gossipsub::ConfigBuilder::from(config.gossipsub_config.clone())
.validation_mode(gossipsub::ValidationMode::None)
.message_id_fn(compute_message_id)
.build()?,
@ -186,45 +167,3 @@ fn compute_message_id(message: &Message) -> MessageId {
hasher.update(&message.data);
MessageId::from(hasher.finalize().to_vec())
}
mod secret_key_serde {
use libp2p::identity::secp256k1;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(key: &secp256k1::SecretKey, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let hex_str = hex::encode(key.to_bytes());
hex_str.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<secp256k1::SecretKey, D::Error>
where
D: Deserializer<'de>,
{
let hex_str = String::deserialize(deserializer)?;
let mut key_bytes = hex::decode(hex_str).map_err(|e| D::Error::custom(format!("{e}")))?;
secp256k1::SecretKey::try_from_bytes(key_bytes.as_mut_slice())
.map_err(|e| D::Error::custom(format!("{e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_serde() {
let config: SwarmConfig = Default::default();
let serialized = serde_json::to_string(&config).unwrap();
println!("{serialized}");
let deserialized: SwarmConfig = serde_json::from_str(serialized.as_str()).unwrap();
assert_eq!(deserialized.host, config.host);
assert_eq!(deserialized.port, config.port);
assert_eq!(deserialized.node_key.to_bytes(), config.node_key.to_bytes());
}
}

View File

@ -14,7 +14,7 @@ futures = "0.3"
nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" }
nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" }
rand_chacha = "0.3"
rand = "0.8"
serde = { version = "1", features = ["derive"] }

View File

@ -11,7 +11,7 @@ futures = "0.3"
moka = { version = "0.11", features = ["future"] }
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../network" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
serde = "1.0"
tracing = "0.1"
tokio = { version = "1", features = ["sync", "macros"] }

View File

@ -37,12 +37,6 @@ where
pub async fn remove(&self, hash: &B::Hash) {
self.0.remove(hash).await;
}
pub fn pending_blobs(&self) -> Box<dyn Iterator<Item = B> + Send> {
// bypass lifetime
let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect();
Box::new(blobs.into_iter())
}
}
#[async_trait::async_trait]
@ -68,7 +62,7 @@ where
Ok(())
}
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send> {
BlobCache::pending_blobs(self)
fn get_blob(&self, id: &<Self::Blob as Blob>::Hash) -> Option<Self::Blob> {
self.0.get(id)
}
}

View File

@ -20,5 +20,5 @@ pub trait DaBackend {
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError>;
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
fn get_blob(&self, id: &<Self::Blob as Blob>::Hash) -> Option<Self::Blob>;
}

View File

@ -3,6 +3,7 @@ pub mod network;
// std
use overwatch_rs::DynError;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
// crates
use futures::StreamExt;
@ -32,23 +33,24 @@ where
}
pub enum DaMsg<B: Blob> {
PendingBlobs {
reply_channel: Sender<Box<dyn Iterator<Item = B> + Send>>,
},
RemoveBlobs {
blobs: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
},
Get {
ids: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
reply_channel: Sender<HashMap<<B as Blob>::Hash, B>>,
},
}
impl<B: Blob + 'static> Debug for DaMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DaMsg::PendingBlobs { .. } => {
write!(f, "DaMsg::PendingBlobs")
}
DaMsg::RemoveBlobs { .. } => {
write!(f, "DaMsg::RemoveBlobs")
}
DaMsg::Get { .. } => {
write!(f, "DaMsg::Get")
}
}
}
}
@ -156,12 +158,6 @@ where
<B::Blob as Blob>::Hash: Debug,
{
match msg {
DaMsg::PendingBlobs { reply_channel } => {
let pending_blobs = backend.pending_blobs();
if reply_channel.send(pending_blobs).is_err() {
tracing::debug!("Could not send pending blobs");
}
}
DaMsg::RemoveBlobs { blobs } => {
futures::stream::iter(blobs)
.for_each_concurrent(None, |blob| async move {
@ -171,6 +167,14 @@ where
})
.await;
}
DaMsg::Get { ids, reply_channel } => {
let res = ids
.filter_map(|id| backend.get_blob(&id).map(|blob| (id, blob)))
.collect();
if reply_channel.send(res).is_err() {
tracing::error!("Could not returns blobs");
}
}
}
Ok(())
}

View File

@ -25,8 +25,8 @@ clap = { version = "4", features = ["derive", "env"], optional = true }
futures = "0.3"
http = "0.2.9"
hyper = { version = "0.14", optional = true }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
parking_lot = { version = "0.12", optional = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", optional = true }

View File

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tracing-appender = "0.2"

View File

@ -12,7 +12,7 @@ futures = "0.3"
linked-hash-map = { version = "0.5.6", optional = true }
nomos-network = { path = "../network" }
nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
rand = { version = "0.8", optional = true }
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
@ -23,7 +23,7 @@ chrono = "0.4"
[dev-dependencies]
nomos-log = { path = "../log" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
tokio = { version = "1", features = ["full"] }
blake2 = "0.10"

View File

@ -15,8 +15,8 @@ async-trait = "0.1"
bytes = "1.3"
clap = { version = "4", features = ["derive", "env"], optional = true }
nomos-http = { path = "../http", optional = true }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
once_cell = "1.16"
parking_lot = "0.12"
prometheus = "0.13"

View File

@ -10,7 +10,7 @@ async-trait = "0.1"
bytes = "1.2"
chrono = { version = "0.4", optional = true }
humantime-serde = { version = "1", optional = true }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
multiaddr = "0.15"
serde = { version = "1.0", features = ["derive"] }
sscanf = { version = "0.4", optional = true }

View File

@ -9,7 +9,7 @@ edition = "2021"
async-trait = "0.1"
tokio = { version = "1", features = ["sync"] }
bytes = "1.2"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
serde = "1.0"
sled = { version = "0.34", optional = true }
thiserror = "1.0"

View File

@ -10,7 +10,7 @@ nomos-consensus = { path = "../nomos-services/consensus" }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"]}
nomos-log = { path = "../nomos-services/log" }
nomos-http = { path = "../nomos-services/http", features = ["http"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
nomos-core = { path = "../nomos-core" }
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] }

View File

@ -1,10 +1,12 @@
use consensus_engine::View;
use consensus_engine::{Block, NodeId, TimeoutQc, View};
use fraction::Fraction;
use futures::stream::{self, StreamExt};
use nomos_consensus::CarnotInfo;
use std::collections::HashSet;
use tests::{ConsensusConfig, MixNode, Node, NomosNode, SpawnConfig};
const TARGET_VIEW: View = View::new(20);
const DUMMY_NODE_ID: NodeId = NodeId::new([0u8; 32]);
#[tokio::test]
async fn ten_nodes_one_down() {
@ -46,30 +48,75 @@ async fn ten_nodes_one_down() {
.collect::<Vec<_>>()
.await;
// check that they have the same block
let target_blocks = infos
.iter()
.map(|i| i.safe_blocks.values().find(|b| b.view == TARGET_VIEW))
let target_block = assert_block_consensus(&infos, TARGET_VIEW);
// If no node has the target block, check that TARGET_VIEW was reached by timeout_qc.
if target_block.is_none() {
println!("No node has the block with {TARGET_VIEW:?}. Checking timeout_qcs...");
assert_timeout_qc_consensus(&infos, TARGET_VIEW.prev());
}
}
// Check if all nodes have the same block at the specific view.
fn assert_block_consensus<'a>(
consensus_infos: impl IntoIterator<Item = &'a CarnotInfo>,
view: View,
) -> Option<Block> {
let blocks = consensus_infos
.into_iter()
.map(|i| i.safe_blocks.values().find(|b| b.view == view))
.collect::<HashSet<_>>();
// Every nodes must have the same target block (Some(block))
// , or no node must have it (None).
assert_eq!(target_blocks.len(), 1);
assert_eq!(
blocks.len(),
1,
"multiple blocks found at {:?}: {:?}",
view,
blocks
);
// If no node has the target block, check that TARGET_VIEW was reached by timeout_qc.
let target_block = target_blocks.iter().next().unwrap();
if target_block.is_none() {
println!("No node has the block with {TARGET_VIEW:?}. Checking timeout_qcs...");
let timeout_qcs = infos
.iter()
.map(|i| i.last_view_timeout_qc.clone())
.collect::<HashSet<_>>();
assert_eq!(timeout_qcs.len(), 1);
let timeout_qc = timeout_qcs.iter().next().unwrap().clone();
assert!(timeout_qc.is_some());
// NOTE: This check could be failed if other timeout_qc had occured before `infos` were gathered.
// But it should be okay as long as the `timeout` is not too short.
assert_eq!(timeout_qc.unwrap().view(), TARGET_VIEW.prev());
}
blocks.iter().next().unwrap().cloned()
}
// Check if all nodes have the same timeout_qc at the specific view.
fn assert_timeout_qc_consensus<'a>(
consensus_infos: impl IntoIterator<Item = &'a CarnotInfo>,
view: View,
) -> TimeoutQc {
let timeout_qcs = consensus_infos
.into_iter()
.map(|i| {
i.last_view_timeout_qc.clone().map(|timeout_qc| {
// Masking the `sender` field because we want timeout_qcs from different
// senders to be considered the same if all other fields are the same.
TimeoutQc::new(
timeout_qc.view(),
timeout_qc.high_qc().clone(),
DUMMY_NODE_ID,
)
})
})
.collect::<HashSet<_>>();
assert_eq!(
timeout_qcs.len(),
1,
"multiple timeout_qcs found at {:?}: {:?}",
view,
timeout_qcs
);
let timeout_qc = timeout_qcs
.iter()
.next()
.unwrap()
.clone()
.expect("collected timeout_qc shouldn't be None");
// NOTE: This check could be failed if other timeout_qcs had occured
// before `consensus_infos` were gathered.
// But it should be okay as long as the `timeout` is not too short.
assert_eq!(timeout_qc.view(), view);
timeout_qc
}