Add API to revieve DA blobs (#477)

* Use Vec instead of HashMap in DaMsg::Get

* Add http API to retrieve DA blobs
This commit is contained in:
Giacomo Pasini 2023-10-24 15:38:33 +02:00 committed by GitHub
parent 3ce8cacb30
commit 89b8e27612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 7 deletions

View File

@ -6,7 +6,7 @@ use libp2p::*;
use bytes::Bytes;
use http::StatusCode;
use nomos_consensus::{CarnotInfo, ConsensusMsg};
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tracing::error;
@ -17,6 +17,7 @@ use nomos_core::{
da::{blob, certificate::Certificate as _},
tx::Transaction,
};
use nomos_da::DaMsg;
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
@ -28,6 +29,7 @@ use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
use nomos_network::backends::libp2p::Libp2p;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use nomos_node::DataAvailability as DataAvailabilityService;
use nomos_node::{Carnot, Tx};
use overwatch_rs::services::relay::OutboundRelay;
@ -123,6 +125,41 @@ pub fn network_info_bridge(
}))
}
pub fn da_blob_get_bridge(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
) -> HttpBridgeRunner {
Box::new(Box::pin(async move {
post_handler!(handle, DataAvailabilityService, "blobs" => handle_da_blobs_req)
}))
}
pub async fn handle_da_blobs_req<B>(
da_channel: &OutboundRelay<DaMsg<B>>,
payload: Option<Bytes>,
res_tx: Sender<HttpResponse>,
) -> Result<(), overwatch_rs::DynError>
where
B: blob::Blob + Serialize,
B::Hash: DeserializeOwned + Send + 'static,
{
let (reply_channel, receiver) = oneshot::channel();
let ids: Vec<B::Hash> = serde_json::from_slice(payload.unwrap_or_default().as_ref())?;
da_channel
.send(DaMsg::Get {
ids: Box::new(ids.into_iter()),
reply_channel,
})
.await
.map_err(|(e, _)| e)?;
let blobs = receiver.await.unwrap();
res_tx
.send(Ok(serde_json::to_string(&blobs).unwrap().into()))
.await?;
Ok(())
}
pub async fn handle_mempool_status_req<K, V>(
mempool_channel: &OutboundRelay<MempoolMsg<V, K>>,
payload: Option<Bytes>,

View File

@ -66,7 +66,7 @@ pub type Carnot = CarnotConsensus<
SledBackend<Wire>,
>;
type DataAvailability = DataAvailabilityService<
pub type DataAvailability = DataAvailabilityService<
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
DaLibp2pAdapter<Blob, Attestation>,

View File

@ -66,6 +66,7 @@ fn main() -> Result<()> {
Arc::new(Box::new(bridges::da_mempool_metrics_bridge)),
Arc::new(Box::new(bridges::cl_mempool_status_bridge)),
Arc::new(Box::new(bridges::da_mempool_status_bridge)),
Arc::new(Box::new(bridges::da_blob_get_bridge)),
Arc::new(Box::new(bridges::network_info_bridge)),
Arc::new(Box::new(
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,

View File

@ -3,7 +3,6 @@ pub mod network;
// std
use overwatch_rs::DynError;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
// crates
use futures::StreamExt;
@ -38,7 +37,7 @@ pub enum DaMsg<B: Blob> {
},
Get {
ids: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
reply_channel: Sender<HashMap<<B as Blob>::Hash, B>>,
reply_channel: Sender<Vec<B>>,
},
}
@ -168,9 +167,7 @@ where
.await;
}
DaMsg::Get { ids, reply_channel } => {
let res = ids
.filter_map(|id| backend.get_blob(&id).map(|blob| (id, blob)))
.collect();
let res = ids.filter_map(|id| backend.get_blob(&id)).collect();
if reply_channel.send(res).is_err() {
tracing::error!("Could not returns blobs");
}