diff --git a/nodes/nomos-node/src/bridges/mod.rs b/nodes/nomos-node/src/bridges/mod.rs index a570fad1..6d3bd000 100644 --- a/nodes/nomos-node/src/bridges/mod.rs +++ b/nodes/nomos-node/src/bridges/mod.rs @@ -6,7 +6,7 @@ use libp2p::*; use bytes::Bytes; use http::StatusCode; use nomos_consensus::{CarnotInfo, ConsensusMsg}; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tracing::error; @@ -17,6 +17,7 @@ use nomos_core::{ da::{blob, certificate::Certificate as _}, tx::Transaction, }; +use nomos_da::DaMsg; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner}; use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse}; @@ -28,6 +29,7 @@ use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService}; use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::NetworkBackend; use nomos_network::NetworkService; +use nomos_node::DataAvailability as DataAvailabilityService; use nomos_node::{Carnot, Tx}; use overwatch_rs::services::relay::OutboundRelay; @@ -123,6 +125,41 @@ pub fn network_info_bridge( })) } +pub fn da_blob_get_bridge( + handle: overwatch_rs::overwatch::handle::OverwatchHandle, +) -> HttpBridgeRunner { + Box::new(Box::pin(async move { + post_handler!(handle, DataAvailabilityService, "blobs" => handle_da_blobs_req) + })) +} + +pub async fn handle_da_blobs_req( + da_channel: &OutboundRelay>, + payload: Option, + res_tx: Sender, +) -> Result<(), overwatch_rs::DynError> +where + B: blob::Blob + Serialize, + B::Hash: DeserializeOwned + Send + 'static, +{ + let (reply_channel, receiver) = oneshot::channel(); + let ids: Vec = serde_json::from_slice(payload.unwrap_or_default().as_ref())?; + da_channel + .send(DaMsg::Get { + ids: Box::new(ids.into_iter()), + reply_channel, + }) + .await + .map_err(|(e, _)| e)?; + + let blobs = receiver.await.unwrap(); + res_tx + .send(Ok(serde_json::to_string(&blobs).unwrap().into())) + .await?; + + Ok(()) +} + pub async fn handle_mempool_status_req( mempool_channel: &OutboundRelay>, payload: Option, diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 50fb100b..116c2608 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -66,7 +66,7 @@ pub type Carnot = CarnotConsensus< SledBackend, >; -type DataAvailability = DataAvailabilityService< +pub type DataAvailability = DataAvailabilityService< FullReplication>, BlobCache<::Hash, Blob>, DaLibp2pAdapter, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 65d76e30..98395f26 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -66,6 +66,7 @@ fn main() -> Result<()> { Arc::new(Box::new(bridges::da_mempool_metrics_bridge)), Arc::new(Box::new(bridges::cl_mempool_status_bridge)), Arc::new(Box::new(bridges::da_mempool_status_bridge)), + Arc::new(Box::new(bridges::da_blob_get_bridge)), Arc::new(Box::new(bridges::network_info_bridge)), Arc::new(Box::new( bridges::mempool_add_tx_bridge::::Hash>>, diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index f35cf411..b0ebc255 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -3,7 +3,6 @@ pub mod network; // std use overwatch_rs::DynError; -use std::collections::HashMap; use std::fmt::{Debug, Formatter}; // crates use futures::StreamExt; @@ -38,7 +37,7 @@ pub enum DaMsg { }, Get { ids: Box::Hash> + Send>, - reply_channel: Sender::Hash, B>>, + reply_channel: Sender>, }, } @@ -168,9 +167,7 @@ where .await; } DaMsg::Get { ids, reply_channel } => { - let res = ids - .filter_map(|id| backend.get_blob(&id).map(|blob| (id, blob))) - .collect(); + let res = ids.filter_map(|id| backend.get_blob(&id)).collect(); if reply_channel.send(res).is_err() { tracing::error!("Could not returns blobs"); }