Split long consensus response in separate APIs (#502)

* Split long consensus response in separate APIs

Consensus info was returning the full list of blocks even though
that can get quite large with time. Instead, this commit change
that API to return a constant size message and adds a new one to
return a chain of blocks with user specified endings.

* Update nomos-services/consensus/src/lib.rs

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>

* Fix test

---------

Co-authored-by: Youngjoon Lee <taxihighway@gmail.com>
This commit is contained in:
Giacomo Pasini 2023-11-06 12:43:48 +01:00 committed by GitHub
parent 1c9528e38f
commit 56e8506704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 172 additions and 42 deletions

View File

@ -47,6 +47,15 @@ impl<O: Overlay> Carnot<O> {
&self.safe_blocks
}
/// Return the most recent safe block
pub fn tip(&self) -> Block {
self.safe_blocks
.iter()
.max_by_key(|(_, b)| b.view)
.map(|(_, b)| b.clone())
.unwrap()
}
/// Upon reception of a block
///
/// Preconditions:

View File

@ -1,6 +1,7 @@
mod libp2p;
use consensus_engine::BlockId;
use libp2p::*;
use std::collections::HashMap;
// std
// crates
@ -88,6 +89,23 @@ pub fn carnot_info_bridge(
}))
}
pub fn block_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
let (channel, mut http_request_channel) =
build_http_bridge::<Carnot, AxumBackend, _>(handle, HttpMethod::GET, "blocks")
.await
.unwrap();
while let Some(HttpRequest { res_tx, query, .. }) = http_request_channel.recv().await {
if let Err(e) = handle_block_info_req(&channel, query, res_tx).await {
error!(e);
}
}
Ok(())
}))
}
pub fn cl_mempool_metrics_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
@ -187,6 +205,43 @@ pub async fn handle_block_get_req(
Ok(())
}
pub async fn handle_block_info_req(
carnot_channel: &OutboundRelay<ConsensusMsg>,
query: HashMap<String, String>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
fn parse_block_id(field: Option<&String>) -> Result<Option<BlockId>, overwatch_rs::DynError> {
field
.map(|id| {
hex::decode(id)
.map_err(|e| e.into())
.and_then(|bytes| {
<[u8; 32]>::try_from(bytes)
.map_err(|e| format!("expected 32 bytes found {}", e.len()).into())
})
.map(BlockId::from)
})
.transpose()
}
const QUERY_FROM: &str = "from";
const QUERY_TO: &str = "to";
let (sender, receiver) = oneshot::channel();
carnot_channel
.send(ConsensusMsg::GetBlocks {
from: parse_block_id(query.get(QUERY_FROM))?,
to: parse_block_id(query.get(QUERY_TO))?,
tx: sender,
})
.await
.map_err(|(e, _)| e)?;
let blocks = receiver.await.unwrap();
res_tx.send(Ok(serde_json::to_vec(&blocks)?.into())).await?;
Ok(())
}
pub async fn handle_mempool_status_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<V, K>>,
payload: Option<Bytes>,

View File

@ -60,6 +60,7 @@ fn main() -> Result<()> {
let bridges: Vec<HttpBridge> = vec![
Arc::new(Box::new(bridges::carnot_info_bridge)),
Arc::new(Box::new(bridges::block_info_bridge)),
// Due to a limitation in the current api system, we can't connect a single endopint to multiple services
// which means we need two different paths for complete mempool metrics.
Arc::new(Box::new(bridges::cl_mempool_metrics_bridge)),

View File

@ -5,7 +5,7 @@ mod tally;
mod task_manager;
// std
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::fmt::Debug;
use std::hash::Hash;
use std::pin::Pin;
@ -419,14 +419,37 @@ where
current_view: carnot.current_view(),
highest_voted_view: carnot.highest_voted_view(),
local_high_qc: carnot.high_qc(),
safe_blocks: carnot.safe_blocks().clone(),
tip: carnot.tip(),
last_view_timeout_qc: carnot.last_view_timeout_qc(),
committed_blocks: carnot.latest_committed_blocks(),
last_committed_block: carnot.safe_blocks()
[&carnot.latest_committed_blocks()[0]]
.clone(),
};
tx.send(info).unwrap_or_else(|e| {
tracing::error!("Could not send consensus info through channel: {:?}", e)
});
}
ConsensusMsg::GetBlocks { from, to, tx } => {
// default to tip block if not present
let from = from.unwrap_or(carnot.tip().id);
// default to genesis block if not present
let to = to.unwrap_or(carnot.genesis_block().id);
let mut res = Vec::new();
let mut cur = from;
let blocks = carnot.safe_blocks();
while let Some(block) = blocks.get(&cur) {
res.push(block.clone());
if cur == to || cur == carnot.genesis_block().id {
break;
}
cur = block.parent();
}
tx.send(res)
.unwrap_or_else(|_| tracing::error!("could not send blocks through channel"));
}
}
}
@ -1073,7 +1096,17 @@ enum Event<Tx: Clone + Hash + Eq, BlobCertificate: Clone + Eq + Hash> {
#[derive(Debug)]
pub enum ConsensusMsg {
Info { tx: Sender<CarnotInfo> },
Info {
tx: Sender<CarnotInfo>,
},
/// Walk the chain back from 'from' (the most recent block) to
/// 'to' (the oldest block). If 'from' is None, the tip of the chain is used as a starting
/// point. If 'to' is None or not known to the node, the genesis block is used as an end point.
GetBlocks {
from: Option<BlockId>,
to: Option<BlockId>,
tx: Sender<Vec<consensus_engine::Block>>,
},
}
impl RelayMessage for ConsensusMsg {}
@ -1086,10 +1119,9 @@ pub struct CarnotInfo {
pub current_view: View,
pub highest_voted_view: View,
pub local_high_qc: StandardQc,
#[serde_as(as = "Vec<(_, _)>")]
pub safe_blocks: HashMap<BlockId, consensus_engine::Block>,
pub tip: consensus_engine::Block,
pub last_view_timeout_qc: Option<TimeoutQc>,
pub committed_blocks: Vec<BlockId>,
pub last_committed_block: consensus_engine::Block,
}
async fn get_mempool_contents<Item, Key>(
@ -1138,29 +1170,36 @@ mod tests {
view: View::new(0),
id: BlockId::zeros(),
},
safe_blocks: HashMap::from([(
BlockId::zeros(),
Block {
id: BlockId::zeros(),
tip: Block {
id: BlockId::zeros(),
view: View::new(0),
parent_qc: Qc::Standard(StandardQc {
view: View::new(0),
parent_qc: Qc::Standard(StandardQc {
view: View::new(0),
id: BlockId::zeros(),
}),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
},
id: BlockId::zeros(),
}),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
},
)]),
},
last_view_timeout_qc: None,
committed_blocks: vec![BlockId::zeros()],
last_committed_block: Block {
id: BlockId::zeros(),
view: View::new(0),
parent_qc: Qc::Standard(StandardQc {
view: View::new(0),
id: BlockId::zeros(),
}),
leader_proof: LeaderProof::LeaderId {
leader_id: NodeId::new([0; 32]),
},
},
};
let serialized = serde_json::to_string(&info).unwrap();
eprintln!("{serialized}");
assert_eq!(
serialized,
r#"{"id":"0x0000000000000000000000000000000000000000000000000000000000000000","current_view":1,"highest_voted_view":-1,"local_high_qc":{"view":0,"id":"0x0000000000000000000000000000000000000000000000000000000000000000"},"safe_blocks":[["0x0000000000000000000000000000000000000000000000000000000000000000",{"id":"0x0000000000000000000000000000000000000000000000000000000000000000","view":0,"parent_qc":{"Standard":{"view":0,"id":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"leader_proof":{"LeaderId":{"leader_id":"0x0000000000000000000000000000000000000000000000000000000000000000"}}}]],"last_view_timeout_qc":null,"committed_blocks":["0x0000000000000000000000000000000000000000000000000000000000000000"]}"#
r#"{"id":"0x0000000000000000000000000000000000000000000000000000000000000000","current_view":1,"highest_voted_view":-1,"local_high_qc":{"view":0,"id":"0x0000000000000000000000000000000000000000000000000000000000000000"},"tip":{"id":"0x0000000000000000000000000000000000000000000000000000000000000000","view":0,"parent_qc":{"Standard":{"view":0,"id":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"leader_proof":{"LeaderId":{"leader_id":"0x0000000000000000000000000000000000000000000000000000000000000000"}}},"last_view_timeout_qc":null,"last_committed_block":{"id":"0x0000000000000000000000000000000000000000000000000000000000000000","view":0,"parent_qc":{"Standard":{"view":0,"id":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"leader_proof":{"LeaderId":{"leader_id":"0x0000000000000000000000000000000000000000000000000000000000000000"}}}}"#
);
let deserialized: CarnotInfo = serde_json::from_str(&serialized).unwrap();

View File

@ -32,6 +32,7 @@ const CARNOT_INFO_API: &str = "carnot/info";
const STORAGE_BLOCKS_API: &str = "storage/block";
const MEMPOOL_API: &str = "mempool-";
const LOGS_PREFIX: &str = "__logs";
const GET_BLOCKS_INFO: &str = "carnot/blocks";
pub struct NomosNode {
addr: SocketAddr,
@ -55,6 +56,10 @@ impl Drop for NomosNode {
}
impl NomosNode {
pub fn id(&self) -> NodeId {
NodeId::from(self.config.consensus.private_key)
}
pub async fn spawn(mut config: Config) -> Self {
// Waku stores the messages in a db file in the current dir, we need a different
// directory for each node to avoid conflicts
@ -139,6 +144,29 @@ impl NomosNode {
}
}
pub async fn get_blocks_info(
&self,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Vec<consensus_engine::Block> {
let mut req = CLIENT.get(format!("http://{}/{}", self.addr, GET_BLOCKS_INFO));
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
req.send()
.await
.unwrap()
.json::<Vec<consensus_engine::Block>>()
.await
.unwrap()
}
// not async so that we can use this in `Drop`
pub fn get_logs_from_file(&self) -> String {
println!(

View File

@ -36,28 +36,24 @@ async fn happy_test(nodes: &[NomosNode]) {
};
let infos = stream::iter(nodes)
.then(|n| async move { n.consensus_info().await })
.then(|n| async move { n.get_blocks_info(None, None).await })
.collect::<Vec<_>>()
.await;
// check that they have the same block
let blocks = infos
.iter()
.map(|i| {
i.safe_blocks
.values()
.find(|b| b.view == TARGET_VIEW)
.unwrap()
})
.map(|i| i.iter().find(|b| b.view == TARGET_VIEW).unwrap())
.collect::<HashSet<_>>();
// try to see if we have invalid blocks
let invalid_blocks = infos
.iter()
.flat_map(|i| {
i.safe_blocks.values().filter_map(|b| match &b.parent_qc {
.zip(nodes.iter())
.flat_map(|(blocks, node)| {
blocks.iter().filter_map(|b| match &b.parent_qc {
Qc::Standard(_) => None,
Qc::Aggregated(_) => Some(Info {
node_id: i.id.to_string(),
node_id: node.id().to_string(),
block_id: b.id.to_string(),
view: b.view,
}),
@ -92,7 +88,7 @@ async fn test_get_block() {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
happy_test(&nodes).await;
let id = nodes[0].consensus_info().await.committed_blocks[0];
let id = nodes[0].consensus_info().await.last_committed_block.id;
tokio::time::timeout(Duration::from_secs(10), async {
while nodes[0].get_block(id).await.is_none() {
tokio::time::sleep(Duration::from_millis(100)).await;

View File

@ -43,12 +43,17 @@ async fn ten_nodes_one_down() {
} => {}
};
let infos = stream::iter(nodes)
.then(|n| async move { n.consensus_info().await })
.collect::<Vec<_>>()
let (infos, blocks): (Vec<_>, Vec<_>) = stream::iter(nodes)
.then(|n| async move {
(
n.consensus_info().await,
n.get_blocks_info(None, None).await,
)
})
.unzip()
.await;
let target_block = assert_block_consensus(&infos, TARGET_VIEW);
let target_block = assert_block_consensus(&blocks, TARGET_VIEW);
// If no node has the target block, check that TARGET_VIEW was reached by timeout_qc.
if target_block.is_none() {
@ -58,13 +63,10 @@ async fn ten_nodes_one_down() {
}
// Check if all nodes have the same block at the specific view.
fn assert_block_consensus<'a>(
consensus_infos: impl IntoIterator<Item = &'a CarnotInfo>,
view: View,
) -> Option<Block> {
let blocks = consensus_infos
fn assert_block_consensus<'a>(blocks: &[Vec<Block>], view: View) -> Option<Block> {
let blocks = blocks
.into_iter()
.map(|i| i.safe_blocks.values().find(|b| b.view == view))
.map(|b| b.iter().find(|b| b.view == view))
.collect::<HashSet<_>>();
// Every nodes must have the same target block (Some(block))
// , or no node must have it (None).