diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index 6d3bd000..e8f87f34 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -1,4 +1,5 @@ mod libp2p; +use consensus_engine::BlockId; use libp2p::*; // std @@ -14,6 +15,7 @@ use tracing::error; use full_replication::{Blob, Certificate}; use nomos_core::wire; use nomos_core::{ + block::Block, da::{blob, certificate::Certificate as _}, tx::Transaction, }; @@ -29,8 +31,9 @@ use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; -use nomos_node::DataAvailability as DataAvailabilityService; use nomos_node::{Carnot, Tx}; +use nomos_node::{DataAvailability as DataAvailabilityService, Wire}; +use nomos_storage::{backends::sled::SledBackend, StorageMsg, StorageService}; use overwatch_rs::services::relay::OutboundRelay; type DaMempoolService = MempoolService< @@ -117,6 +120,14 @@ pub fn cl_mempool_status_bridge( })) } +pub fn storage_get_blocks_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + post_handler!(handle, StorageService>, "block" => handle_block_get_req) + })) +} + pub fn network_info_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, ) -> HttpBridgeRunner { @@ -160,6 +171,22 @@ where Ok(()) } +pub async fn handle_block_get_req( + storage_channel: &OutboundRelay>>, + payload: Option, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> { + let key: BlockId = serde_json::from_slice(payload.unwrap_or_default().as_ref())?; + let (msg, receiver) = StorageMsg::new_load_message(key); + storage_channel.send(msg).await.map_err(|(e, _)| e)?; + let block: Option> = receiver.recv().await?; + res_tx + .send(Ok(serde_json::to_string(&block).unwrap().into())) + .await?; + + Ok(()) +} + pub async fn handle_mempool_status_req( mempool_channel: &OutboundRelay>, payload: Option, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 98395f26..d0cf26e1 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -67,6 +67,7 @@ fn main() -> Result<()> { Arc::new(Box::new(bridges::cl_mempool_status_bridge)), Arc::new(Box::new(bridges::da_mempool_status_bridge)), Arc::new(Box::new(bridges::da_blob_get_bridge)), + Arc::new(Box::new(bridges::storage_get_blocks_bridge)), Arc::new(Box::new(bridges::network_info_bridge)), Arc::new(Box::new( bridges::mempool_add_tx_bridge::::Hash>>, diff --git a/nomos-services/storage/src/lib.rs b/nomos-services/storage/src/lib.rs index 476f72a0..b2754f03 100644 --- a/nomos-services/storage/src/lib.rs +++ b/nomos-services/storage/src/lib.rs @@ -59,19 +59,23 @@ impl StorageReplyReceiver { } } -impl StorageReplyReceiver { +impl StorageReplyReceiver, Backend> { /// Receive and transform the reply into the desired type /// Target type must implement `From` from the original backend stored type. - pub async fn recv(self) -> Result + pub async fn recv( + self, + ) -> Result, tokio::sync::oneshot::error::RecvError> where Output: DeserializeOwned, { self.channel .await // TODO: This should probably just return a result anyway. But for now we can consider in infallible. - .map(|b| { - Backend::SerdeOperator::deserialize(b) - .expect("Recovery from storage should never fail") + .map(|maybe_bytes| { + maybe_bytes.map(|bytes| { + Backend::SerdeOperator::deserialize(bytes) + .expect("Recovery from storage should never fail") + }) }) } } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 8d1de83a..0bc8f7a6 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -5,18 +5,20 @@ use std::time::Duration; // internal use crate::{get_available_port, ConsensusConfig, MixnetConfig, Node, SpawnConfig}; use consensus_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings}; -use consensus_engine::{NodeId, Overlay}; +use consensus_engine::{BlockId, NodeId, Overlay}; +use full_replication::Certificate; use mixnet_client::{MixnetClientConfig, MixnetClientMode}; use mixnet_node::MixnetNodeConfig; use mixnet_topology::MixnetTopology; use nomos_consensus::{CarnotInfo, CarnotSettings}; +use nomos_core::block::Block; use nomos_http::backends::axum::AxumBackendSettings; use nomos_libp2p::{multiaddr, Multiaddr}; use nomos_log::{LoggerBackend, LoggerFormat}; use nomos_mempool::MempoolMetrics; use nomos_network::backends::libp2p::Libp2pConfig; use nomos_network::NetworkConfig; -use nomos_node::Config; +use nomos_node::{Config, Tx}; // crates use fraction::Fraction; use once_cell::sync::Lazy; @@ -27,6 +29,7 @@ use tempfile::NamedTempFile; static CLIENT: Lazy = Lazy::new(Client::new); const NOMOS_BIN: &str = "../target/debug/nomos-node"; const CARNOT_INFO_API: &str = "carnot/info"; +const STORAGE_BLOCKS_API: &str = "storage/block"; const MEMPOOL_API: &str = "mempool-"; const LOGS_PREFIX: &str = "__logs"; @@ -96,11 +99,27 @@ impl NomosNode { } async fn wait_online(&self) { - while self.get(CARNOT_INFO_API).await.is_err() { + loop { + let res = self.get(CARNOT_INFO_API).await; + if res.is_ok() && res.unwrap().status().is_success() { + break; + } tokio::time::sleep(Duration::from_millis(100)).await; } } + pub async fn get_block(&self, id: BlockId) -> Option> { + CLIENT + .post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) + .body(serde_json::to_string(&id).unwrap()) + .send() + .await + .unwrap() + .json::>>() + .await + .unwrap() + } + pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics { let discr = match pool { Pool::Cl => "cl", @@ -185,12 +204,8 @@ impl Node for NomosNode { } async fn consensus_info(&self) -> Self::ConsensusInfo { - self.get(CARNOT_INFO_API) - .await - .unwrap() - .json() - .await - .unwrap() + let res = self.get(CARNOT_INFO_API).await; + res.unwrap().json().await.unwrap() } fn stop(&mut self) { diff --git a/tests/src/tests/happy.rs b/tests/src/tests/happy.rs index a314d287..359877d9 100644 --- a/tests/src/tests/happy.rs +++ b/tests/src/tests/happy.rs @@ -14,18 +14,18 @@ struct Info { view: View, } -async fn happy_test(nodes: Vec) { +async fn happy_test(nodes: &[NomosNode]) { let timeout = std::time::Duration::from_secs(20); let timeout = tokio::time::sleep(timeout); tokio::select! { _ = timeout => panic!("timed out waiting for nodes to reach view {}", TARGET_VIEW), - _ = async { while stream::iter(&nodes) + _ = async { while stream::iter(nodes) .any(|n| async move { n.consensus_info().await.current_view < TARGET_VIEW }) .await { println!( "waiting... {}", - stream::iter(&nodes) + stream::iter(nodes) .then(|n| async move { format!("{}", n.consensus_info().await.current_view) }) .collect::>() .await @@ -86,7 +86,7 @@ async fn two_nodes_happy() { mixnet: mixnet_config, }) .await; - happy_test(nodes).await; + happy_test(&nodes).await; } #[tokio::test] @@ -101,5 +101,29 @@ async fn ten_nodes_happy() { mixnet: mixnet_config, }) .await; - happy_test(nodes).await; + happy_test(&nodes).await; +} + +#[tokio::test] +async fn test_get_block() { + let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await; + let nodes = NomosNode::spawn_nodes(SpawnConfig::Chain { + consensus: ConsensusConfig { + n_participants: 2, + threshold: Fraction::one(), + timeout: Duration::from_secs(10), + }, + mixnet: mixnet_config, + }) + .await; + happy_test(&nodes).await; + let id = nodes[0].consensus_info().await.committed_blocks[0]; + tokio::time::timeout(Duration::from_secs(10), async { + while nodes[0].get_block(id).await.is_none() { + tokio::time::sleep(Duration::from_millis(100)).await; + println!("trying..."); + } + }) + .await + .unwrap(); }