Add http API to retrieve DA blobs
This commit is contained in:
parent
949517ba06
commit
ef1b923a7f
|
@ -6,7 +6,7 @@ use libp2p::*;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
use nomos_consensus::{CarnotInfo, ConsensusMsg};
|
use nomos_consensus::{CarnotInfo, ConsensusMsg};
|
||||||
use serde::de::DeserializeOwned;
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
use tracing::error;
|
use tracing::error;
|
||||||
|
@ -17,6 +17,7 @@ use nomos_core::{
|
||||||
da::{blob, certificate::Certificate as _},
|
da::{blob, certificate::Certificate as _},
|
||||||
tx::Transaction,
|
tx::Transaction,
|
||||||
};
|
};
|
||||||
|
use nomos_da::DaMsg;
|
||||||
use nomos_http::backends::axum::AxumBackend;
|
use nomos_http::backends::axum::AxumBackend;
|
||||||
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
|
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
|
||||||
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
|
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
|
||||||
|
@ -28,6 +29,7 @@ use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
|
||||||
use nomos_network::backends::libp2p::Libp2p;
|
use nomos_network::backends::libp2p::Libp2p;
|
||||||
use nomos_network::backends::NetworkBackend;
|
use nomos_network::backends::NetworkBackend;
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
|
use nomos_node::DataAvailability as DataAvailabilityService;
|
||||||
use nomos_node::{Carnot, Tx};
|
use nomos_node::{Carnot, Tx};
|
||||||
use overwatch_rs::services::relay::OutboundRelay;
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
|
|
||||||
|
@ -123,6 +125,41 @@ pub fn network_info_bridge(
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn da_blob_get_bridge(
|
||||||
|
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||||
|
) -> HttpBridgeRunner {
|
||||||
|
Box::new(Box::pin(async move {
|
||||||
|
post_handler!(handle, DataAvailabilityService, "blobs" => handle_da_blobs_req)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_da_blobs_req<B>(
|
||||||
|
da_channel: &OutboundRelay<DaMsg<B>>,
|
||||||
|
payload: Option<Bytes>,
|
||||||
|
res_tx: Sender<HttpResponse>,
|
||||||
|
) -> Result<(), overwatch_rs::DynError>
|
||||||
|
where
|
||||||
|
B: blob::Blob + Serialize,
|
||||||
|
B::Hash: DeserializeOwned + Send + 'static,
|
||||||
|
{
|
||||||
|
let (reply_channel, receiver) = oneshot::channel();
|
||||||
|
let ids: Vec<B::Hash> = serde_json::from_slice(payload.unwrap_or_default().as_ref())?;
|
||||||
|
da_channel
|
||||||
|
.send(DaMsg::Get {
|
||||||
|
ids: Box::new(ids.into_iter()),
|
||||||
|
reply_channel,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|(e, _)| e)?;
|
||||||
|
|
||||||
|
let blobs = receiver.await.unwrap();
|
||||||
|
res_tx
|
||||||
|
.send(Ok(serde_json::to_string(&blobs).unwrap().into()))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle_mempool_status_req<K, V>(
|
pub async fn handle_mempool_status_req<K, V>(
|
||||||
mempool_channel: &OutboundRelay<MempoolMsg<V, K>>,
|
mempool_channel: &OutboundRelay<MempoolMsg<V, K>>,
|
||||||
payload: Option<Bytes>,
|
payload: Option<Bytes>,
|
||||||
|
|
|
@ -66,7 +66,7 @@ pub type Carnot = CarnotConsensus<
|
||||||
SledBackend<Wire>,
|
SledBackend<Wire>,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
type DataAvailability = DataAvailabilityService<
|
pub type DataAvailability = DataAvailabilityService<
|
||||||
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
|
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
|
||||||
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
|
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
|
||||||
DaLibp2pAdapter<Blob, Attestation>,
|
DaLibp2pAdapter<Blob, Attestation>,
|
||||||
|
|
|
@ -66,6 +66,7 @@ fn main() -> Result<()> {
|
||||||
Arc::new(Box::new(bridges::da_mempool_metrics_bridge)),
|
Arc::new(Box::new(bridges::da_mempool_metrics_bridge)),
|
||||||
Arc::new(Box::new(bridges::cl_mempool_status_bridge)),
|
Arc::new(Box::new(bridges::cl_mempool_status_bridge)),
|
||||||
Arc::new(Box::new(bridges::da_mempool_status_bridge)),
|
Arc::new(Box::new(bridges::da_mempool_status_bridge)),
|
||||||
|
Arc::new(Box::new(bridges::da_blob_get_bridge)),
|
||||||
Arc::new(Box::new(bridges::network_info_bridge)),
|
Arc::new(Box::new(bridges::network_info_bridge)),
|
||||||
Arc::new(Box::new(
|
Arc::new(Box::new(
|
||||||
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,
|
bridges::mempool_add_tx_bridge::<Libp2p, Libp2pAdapter<Tx, <Tx as Transaction>::Hash>>,
|
||||||
|
|
Loading…
Reference in New Issue