Storage api (#473)

* Change impl of StorageReceiver to Option<Bytes>

Load and remove messages return Option<Bytes> and not Bytes, so
let's change the implementation to work around that.

* Add storage/block http api to retrieve blocks from storage

* add tests for storage/block api

* debug tests

* tweak test node online condition
This commit is contained in:
Giacomo Pasini 2023-10-25 12:46:26 +02:00 committed by GitHub
parent 75b36020c2
commit 2f9ebbd32f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 91 additions and 20 deletions

View File

@ -1,4 +1,5 @@
mod libp2p;
use consensus_engine::BlockId;
use libp2p::*;
// std
@ -14,6 +15,7 @@ use tracing::error;
use full_replication::{Blob, Certificate};
use nomos_core::wire;
use nomos_core::{
block::Block,
da::{blob, certificate::Certificate as _},
tx::Transaction,
};
@ -29,8 +31,9 @@ use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::libp2p::Libp2p;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use nomos_node::DataAvailability as DataAvailabilityService;
use nomos_node::{Carnot, Tx};
use nomos_node::{DataAvailability as DataAvailabilityService, Wire};
use nomos_storage::{backends::sled::SledBackend, StorageMsg, StorageService};
use overwatch_rs::services::relay::OutboundRelay;
type DaMempoolService = MempoolService<
@ -117,6 +120,14 @@ pub fn cl_mempool_status_bridge(
}))
}
pub fn storage_get_blocks_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
post_handler!(handle, StorageService<SledBackend<Wire>>, "block" => handle_block_get_req)
}))
}
pub fn network_info_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
@ -160,6 +171,22 @@ where
Ok(())
}
pub async fn handle_block_get_req(
storage_channel: &OutboundRelay<StorageMsg<SledBackend<Wire>>>,
payload: Option<Bytes>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError> {
let key: BlockId = serde_json::from_slice(payload.unwrap_or_default().as_ref())?;
let (msg, receiver) = StorageMsg::new_load_message(key);
storage_channel.send(msg).await.map_err(|(e, _)| e)?;
let block: Option<Block<Tx, Certificate>> = receiver.recv().await?;
res_tx
.send(Ok(serde_json::to_string(&block).unwrap().into()))
.await?;
Ok(())
}
pub async fn handle_mempool_status_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<V, K>>,
payload: Option<Bytes>,

View File

@ -67,6 +67,7 @@ fn main() -> Result<()> {
Arc::new(Box::new(bridges::cl_mempool_status_bridge)),
Arc::new(Box::new(bridges::da_mempool_status_bridge)),
Arc::new(Box::new(bridges::da_blob_get_bridge)),
Arc::new(Box::new(bridges::storage_get_blocks_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)),
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,

View File

@ -59,20 +59,24 @@ impl<T, Backend> StorageReplyReceiver<T, Backend> {
}
}
impl<Backend: StorageBackend> StorageReplyReceiver<Bytes, Backend> {
impl<Backend: StorageBackend> StorageReplyReceiver<Option<Bytes>, Backend> {
/// Receive and transform the reply into the desired type
/// Target type must implement `From` from the original backend stored type.
pub async fn recv<Output>(self) -> Result<Output, tokio::sync::oneshot::error::RecvError>
pub async fn recv<Output>(
self,
) -> Result<Option<Output>, tokio::sync::oneshot::error::RecvError>
where
Output: DeserializeOwned,
{
self.channel
.await
// TODO: This should probably just return a result anyway. But for now we can consider in infallible.
.map(|b| {
Backend::SerdeOperator::deserialize(b)
.map(|maybe_bytes| {
maybe_bytes.map(|bytes| {
Backend::SerdeOperator::deserialize(bytes)
.expect("Recovery from storage should never fail")
})
})
}
}

View File

@ -5,18 +5,20 @@ use std::time::Duration;
// internal
use crate::{get_available_port, ConsensusConfig, MixnetConfig, Node, SpawnConfig};
use consensus_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings};
use consensus_engine::{NodeId, Overlay};
use consensus_engine::{BlockId, NodeId, Overlay};
use full_replication::Certificate;
use mixnet_client::{MixnetClientConfig, MixnetClientMode};
use mixnet_node::MixnetNodeConfig;
use mixnet_topology::MixnetTopology;
use nomos_consensus::{CarnotInfo, CarnotSettings};
use nomos_core::block::Block;
use nomos_http::backends::axum::AxumBackendSettings;
use nomos_libp2p::{multiaddr, Multiaddr};
use nomos_log::{LoggerBackend, LoggerFormat};
use nomos_mempool::MempoolMetrics;
use nomos_network::backends::libp2p::Libp2pConfig;
use nomos_network::NetworkConfig;
use nomos_node::Config;
use nomos_node::{Config, Tx};
// crates
use fraction::Fraction;
use once_cell::sync::Lazy;
@ -27,6 +29,7 @@ use tempfile::NamedTempFile;
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const CARNOT_INFO_API: &str = "carnot/info";
const STORAGE_BLOCKS_API: &str = "storage/block";
const MEMPOOL_API: &str = "mempool-";
const LOGS_PREFIX: &str = "__logs";
@ -96,11 +99,27 @@ impl NomosNode {
}
async fn wait_online(&self) {
while self.get(CARNOT_INFO_API).await.is_err() {
loop {
let res = self.get(CARNOT_INFO_API).await;
if res.is_ok() && res.unwrap().status().is_success() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
CLIENT
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
.body(serde_json::to_string(&id).unwrap())
.send()
.await
.unwrap()
.json::<Option<Block<Tx, Certificate>>>()
.await
.unwrap()
}
pub async fn get_mempoool_metrics(&self, pool: Pool) -> MempoolMetrics {
let discr = match pool {
Pool::Cl => "cl",
@ -185,12 +204,8 @@ impl Node for NomosNode {
}
async fn consensus_info(&self) -> Self::ConsensusInfo {
self.get(CARNOT_INFO_API)
.await
.unwrap()
.json()
.await
.unwrap()
let res = self.get(CARNOT_INFO_API).await;
res.unwrap().json().await.unwrap()
}
fn stop(&mut self) {

View File

@ -14,18 +14,18 @@ struct Info {
view: View,
}
async fn happy_test(nodes: Vec<NomosNode>) {
async fn happy_test(nodes: &[NomosNode]) {
let timeout = std::time::Duration::from_secs(20);
let timeout = tokio::time::sleep(timeout);
tokio::select! {
_ = timeout => panic!("timed out waiting for nodes to reach view {}", TARGET_VIEW),
_ = async { while stream::iter(&nodes)
_ = async { while stream::iter(nodes)
.any(|n| async move { n.consensus_info().await.current_view < TARGET_VIEW })
.await
{
println!(
"waiting... {}",
stream::iter(&nodes)
stream::iter(nodes)
.then(|n| async move { format!("{}", n.consensus_info().await.current_view) })
.collect::<Vec<_>>()
.await
@ -86,7 +86,7 @@ async fn two_nodes_happy() {
mixnet: mixnet_config,
})
.await;
happy_test(nodes).await;
happy_test(&nodes).await;
}
#[tokio::test]
@ -101,5 +101,29 @@ async fn ten_nodes_happy() {
mixnet: mixnet_config,
})
.await;
happy_test(nodes).await;
happy_test(&nodes).await;
}
#[tokio::test]
async fn test_get_block() {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::Chain {
consensus: ConsensusConfig {
n_participants: 2,
threshold: Fraction::one(),
timeout: Duration::from_secs(10),
},
mixnet: mixnet_config,
})
.await;
happy_test(&nodes).await;
let id = nodes[0].consensus_info().await.committed_blocks[0];
tokio::time::timeout(Duration::from_secs(10), async {
while nodes[0].get_block(id).await.is_none() {
tokio::time::sleep(Duration::from_millis(100)).await;
println!("trying...");
}
})
.await
.unwrap();
}