diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index b167af83..8cee5b65 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -15,11 +15,13 @@ chrono = "0.4" futures = "0.3" http = "0.2.9" hex = "0.4.3" +kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } +nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } nomos-api = { path = "../../nomos-services/api" } nomos-log = { path = "../../nomos-services/log" } diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index 5d53cd51..88313c03 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -1,3 +1,5 @@ +use std::error::Error; +use std::ops::Range; use std::{fmt::Debug, hash::Hash}; use axum::{ @@ -10,6 +12,25 @@ use hyper::{ header::{CONTENT_TYPE, USER_AGENT}, Body, StatusCode, }; +use nomos_api::{ + http::{cl, consensus, da, libp2p, mempool, metrics, storage}, + Backend, +}; +use nomos_core::da::certificate::metadata::Metadata; +use nomos_core::da::{certificate, DaVerifier as CoreDaVerifier}; +use nomos_core::{ + da::{attestation::Attestation, blob::Blob}, + header::HeaderId, + tx::Transaction, +}; +use nomos_da_verifier::backend::VerifierBackend; +use nomos_mempool::verify::MempoolVerificationProvider; +use nomos_mempool::{ + network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, + tx::service::openapi::Status, MempoolMetrics, +}; +use nomos_network::backends::libp2p::Libp2p as NetworkBackend; +use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tower_http::{ @@ -19,19 +40,6 @@ use tower_http::{ use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; -use nomos_core::{header::HeaderId, tx::Transaction}; -use nomos_mempool::{ - network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, - tx::service::openapi::Status, MempoolMetrics, -}; -use nomos_network::backends::libp2p::Libp2p as NetworkBackend; -use nomos_storage::backends::StorageSerde; - -use nomos_api::{ - http::{cl, consensus, libp2p, mempool, metrics, storage}, - Backend, -}; - /// Configuration for the Http Server #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct AxumBackendSettings { @@ -41,8 +49,14 @@ pub struct AxumBackendSettings { pub cors_origins: Vec, } -pub struct AxumBackend { +pub struct AxumBackend { settings: AxumBackendSettings, + _attestation: core::marker::PhantomData, + _blob: core::marker::PhantomData, + _certificate: core::marker::PhantomData, + _vid: core::marker::PhantomData, + _params_provider: core::marker::PhantomData, + _verifier_backend: core::marker::PhantomData, _tx: core::marker::PhantomData, _storage_serde: core::marker::PhantomData, } @@ -61,8 +75,46 @@ pub struct AxumBackend { struct ApiDoc; #[async_trait::async_trait] -impl Backend for AxumBackend +impl Backend + for AxumBackend where + A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + ::BlobId: AsRef<[u8]> + Send + Sync + 'static, + C: certificate::Certificate + + Clone + + Debug + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::Id: Clone + Send + Sync, + V: certificate::vid::VidCertificate + + From + + Eq + + Debug + + Metadata + + Hash + + Clone + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::CertificateId: Debug + Clone + Ord + Hash, + ::AppId: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync, + ::Index: + AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync, + VP: MempoolVerificationProvider< + Payload = C, + Parameters = ::VerificationParameters, + > + Send + + Sync + + 'static, + VB: VerifierBackend + CoreDaVerifier + Send + Sync + 'static, + ::Settings: Clone, + ::Error: Error, T: Transaction + Clone + Debug @@ -86,6 +138,12 @@ where { Ok(Self { settings, + _attestation: core::marker::PhantomData, + _blob: core::marker::PhantomData, + _certificate: core::marker::PhantomData, + _vid: core::marker::PhantomData, + _params_provider: core::marker::PhantomData, + _verifier_backend: core::marker::PhantomData, _tx: core::marker::PhantomData, _storage_serde: core::marker::PhantomData, }) @@ -124,6 +182,11 @@ where "/cryptarchia/headers", routing::get(cryptarchia_headers::), ) + .route("/da/add_blob", routing::post(add_blob::)) + .route( + "/da/get_range", + routing::post(get_range::), + ) .route("/network/info", routing::get(libp2p_info)) .route("/storage/block", routing::post(block::)) .route("/mempool/add/tx", routing::post(add_tx::)) @@ -261,6 +324,100 @@ where )) } +#[utoipa::path( + post, + path = "/da/add_blob", + responses( + (status = 200, description = "Attestation for DA blob", body = Option), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn add_blob( + State(handle): State, + Json(blob): Json, +) -> Response +where + A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + ::BlobId: AsRef<[u8]> + Send + Sync + 'static, + VB: VerifierBackend + CoreDaVerifier, + ::Settings: Clone, + ::Error: Error, + SS: StorageSerde + Send + Sync + 'static, +{ + make_request_and_return_response!(da::add_blob::(&handle, blob)) +} + +#[derive(Serialize, Deserialize)] +struct GetRangeReq +where + ::AppId: Serialize + DeserializeOwned, + ::Index: Serialize + DeserializeOwned, +{ + app_id: ::AppId, + range: Range<::Index>, +} + +#[utoipa::path( + post, + path = "/da/get_range", + responses( + (status = 200, description = "Range of blobs", body = Vec<([u8;8], Option)>), + (status = 500, description = "Internal server error", body = String), + ) +)] +async fn get_range( + State(handle): State, + Json(GetRangeReq { app_id, range }): Json>, +) -> Response +where + Tx: Transaction + + Eq + + Clone + + Debug + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, + C: certificate::Certificate + + Clone + + Debug + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::Id: Clone + Send + Sync, + V: certificate::vid::VidCertificate + + From + + Eq + + Debug + + Metadata + + Hash + + Clone + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::CertificateId: Debug + Clone + Ord + Hash, + ::AppId: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync, + ::Index: + AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync, + VP: MempoolVerificationProvider< + Payload = C, + Parameters = ::VerificationParameters, + >, + SS: StorageSerde + Send + Sync + 'static, +{ + make_request_and_return_response!(da::get_range::( + &handle, app_id, range + )) +} + #[utoipa::path( get, path = "/network/info", @@ -277,7 +434,7 @@ async fn libp2p_info(State(handle): State) -> Response { get, path = "/storage/block", responses( - (status = 200, description = "Get the block by block id", body = Block), + (status = 200, description = "Get the block by block id", body = Block), (status = 500, description = "Internal server error", body = String), ) )] diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index fa0a2295..cc973c46 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -3,13 +3,11 @@ use std::{ path::PathBuf, }; -use crate::api::AxumBackend; -use crate::{Tx, Wire, MB16}; +use crate::NomosApiService; use clap::{Parser, ValueEnum}; use color_eyre::eyre::{eyre, Result}; use cryptarchia_ledger::Coin; use hex::FromHex; -use nomos_api::ApiService; use nomos_libp2p::{secp256k1::SecretKey, Multiaddr}; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; @@ -116,7 +114,7 @@ pub struct MetricsArgs { pub struct Config { pub log: ::Settings, pub network: as ServiceData>::Settings, - pub http: > as ServiceData>::Settings, + pub http: ::Settings, pub cryptarchia: ::Settings, } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 28d1665f..fefb5974 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -3,16 +3,19 @@ mod config; mod tx; use color_eyre::eyre::Result; -use full_replication::{Certificate, VidCertificate}; +use kzgrs_backend::dispersal::{Certificate, VidCertificate}; use api::AxumBackend; use bytes::Bytes; pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs}; +use kzgrs_backend::common::attestation::Attestation; +use kzgrs_backend::common::blob::DaBlob; use nomos_api::ApiService; use nomos_core::{da::certificate, header::HeaderId, tx::Transaction, wire}; +use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; #[cfg(feature = "tracing")] use nomos_log::Logger; -use nomos_mempool::da::verify::fullreplication::DaVerificationProvider as MempoolVerificationProvider; +use nomos_mempool::da::verify::kzgrs::DaVerificationProvider as MempoolVerificationProvider; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; #[cfg(feature = "metrics")] @@ -38,6 +41,20 @@ use serde::{de::DeserializeOwned, Serialize}; pub use tx::Tx; +pub type NomosApiService = ApiService< + AxumBackend< + Attestation, + DaBlob, + Certificate, + VidCertificate, + MempoolVerificationProvider, + KzgrsDaVerifier, + Tx, + Wire, + MB16, + >, +>; + pub const CL_TOPIC: &str = "cl"; pub const DA_TOPIC: &str = "da"; const MB16: usize = 1024 * 1024 * 16; @@ -81,7 +98,7 @@ pub struct Nomos { cl_mempool: ServiceHandle, da_mempool: ServiceHandle, cryptarchia: ServiceHandle, - http: ServiceHandle>>, + http: ServiceHandle, storage: ServiceHandle>>, #[cfg(feature = "metrics")] metrics: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index dc5181b1..c11ddc7e 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,4 +1,4 @@ -use full_replication::Certificate; +use kzgrs_backend::dispersal::Certificate; #[cfg(feature = "metrics")] use nomos_metrics::MetricsSettings; use nomos_node::{ @@ -80,9 +80,10 @@ fn main() -> Result<()> { topic: String::from(nomos_node::DA_TOPIC), id: ::id, }, - verification_provider: full_replication::CertificateVerificationParameters { - threshold: 0, - }, + verification_provider: + kzgrs_backend::dispersal::CertificateVerificationParameters { + nodes_public_keys: Default::default(), + }, registry: registry.clone(), }, cryptarchia: config.cryptarchia, diff --git a/nomos-da/kzgrs-backend/Cargo.toml b/nomos-da/kzgrs-backend/Cargo.toml index 5c9e7dbe..f78f3394 100644 --- a/nomos-da/kzgrs-backend/Cargo.toml +++ b/nomos-da/kzgrs-backend/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" ark-ff = "0.4" ark-serialize = "0.4.2" ark-poly = "0.4.2" -bitvec = "1.0.1" +bitvec = { version = "1.0.1", features = ["serde"] } blake2 = "0.10" blst = { version = "0.3.11", features = ["serde"] } itertools = "0.12" diff --git a/nomos-da/kzgrs-backend/src/common/blob.rs b/nomos-da/kzgrs-backend/src/common/blob.rs index cde61f5b..cb43474d 100644 --- a/nomos-da/kzgrs-backend/src/common/blob.rs +++ b/nomos-da/kzgrs-backend/src/common/blob.rs @@ -1,16 +1,16 @@ // std -use std::io::Cursor; // crates -use ark_serialize::*; use kzgrs::Proof; use nomos_core::da::blob; -use serde::ser::SerializeSeq; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use sha3::{Digest, Sha3_256}; // internal use super::build_attestation_message; use crate::common::Column; use crate::common::Commitment; +use crate::common::{ + deserialize_canonical, deserialize_vec_canonical, serialize_canonical, serialize_vec_canonical, +}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DaBlob { @@ -44,7 +44,7 @@ pub struct DaBlob { impl DaBlob { pub fn id(&self) -> Vec { - build_attestation_message(&self.aggregated_column_commitment, &self.rows_commitments) + build_attestation_message(&self.aggregated_column_commitment, &self.rows_commitments).into() } pub fn column_id(&self) -> Vec { @@ -58,59 +58,6 @@ impl blob::Blob for DaBlob { type BlobId = Vec; fn id(&self) -> Self::BlobId { - build_attestation_message(&self.aggregated_column_commitment, &self.rows_commitments) + build_attestation_message(&self.aggregated_column_commitment, &self.rows_commitments).into() } } - -fn serialize_canonical(value: &T, serializer: S) -> Result -where - S: Serializer, - T: CanonicalSerialize, -{ - let mut bytes = Vec::new(); - value - .serialize_compressed(&mut bytes) - .map_err(serde::ser::Error::custom)?; - serializer.serialize_bytes(&bytes) -} - -fn deserialize_canonical<'de, D, T>(deserializer: D) -> Result -where - D: Deserializer<'de>, - T: CanonicalDeserialize, -{ - let bytes: Vec = serde::Deserialize::deserialize(deserializer)?; - let mut cursor = Cursor::new(bytes); - T::deserialize_compressed(&mut cursor).map_err(serde::de::Error::custom) -} - -fn serialize_vec_canonical(values: &[T], serializer: S) -> Result -where - S: Serializer, - T: CanonicalSerialize, -{ - let mut container = serializer.serialize_seq(Some(values.len()))?; - for value in values { - let mut bytes = Vec::new(); - value - .serialize_compressed(&mut bytes) - .map_err(serde::ser::Error::custom)?; - container.serialize_element(&bytes)?; - } - container.end() -} - -fn deserialize_vec_canonical<'de, D, T>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, - T: CanonicalDeserialize, -{ - let bytes_vecs: Vec> = Deserialize::deserialize(deserializer)?; - bytes_vecs - .iter() - .map(|bytes| { - let mut cursor = Cursor::new(bytes); - T::deserialize_compressed(&mut cursor).map_err(serde::de::Error::custom) - }) - .collect() -} diff --git a/nomos-da/kzgrs-backend/src/common/mod.rs b/nomos-da/kzgrs-backend/src/common/mod.rs index 0cb816d8..a964e238 100644 --- a/nomos-da/kzgrs-backend/src/common/mod.rs +++ b/nomos-da/kzgrs-backend/src/common/mod.rs @@ -2,8 +2,9 @@ pub mod attestation; pub mod blob; // std -use ark_serialize::CanonicalSerialize; -use serde::{Deserialize, Serialize}; +use ark_serialize::{CanonicalDeserialize, CanonicalSerialize}; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::io::Cursor; // crates use blake2::digest::{Update, VariableOutput}; @@ -156,7 +157,7 @@ pub fn hash_column_and_commitment( pub fn build_attestation_message( aggregated_column_commitment: &Commitment, rows_commitments: &[Commitment], -) -> Vec { +) -> [u8; 32] { let mut hasher = Sha3_256::new(); Digest::update( &mut hasher, @@ -165,7 +166,7 @@ pub fn build_attestation_message( for c in rows_commitments { Digest::update(&mut hasher, commitment_to_bytes(c)); } - hasher.finalize().to_vec() + hasher.finalize().into() } pub fn commitment_to_bytes(commitment: &Commitment) -> Vec { @@ -175,3 +176,56 @@ pub fn commitment_to_bytes(commitment: &Commitment) -> Vec { .expect("Serialization of commitment should work"); buff.into_inner() } + +pub fn serialize_canonical(value: &T, serializer: S) -> Result +where + S: Serializer, + T: CanonicalSerialize, +{ + let mut bytes = Vec::new(); + value + .serialize_compressed(&mut bytes) + .map_err(serde::ser::Error::custom)?; + serializer.serialize_bytes(&bytes) +} + +pub fn deserialize_canonical<'de, D, T>(deserializer: D) -> Result +where + D: Deserializer<'de>, + T: CanonicalDeserialize, +{ + let bytes: Vec = serde::Deserialize::deserialize(deserializer)?; + let mut cursor = Cursor::new(bytes); + T::deserialize_compressed(&mut cursor).map_err(serde::de::Error::custom) +} + +pub fn serialize_vec_canonical(values: &[T], serializer: S) -> Result +where + S: Serializer, + T: CanonicalSerialize, +{ + let mut container = serializer.serialize_seq(Some(values.len()))?; + for value in values { + let mut bytes = Vec::new(); + value + .serialize_compressed(&mut bytes) + .map_err(serde::ser::Error::custom)?; + container.serialize_element(&bytes)?; + } + container.end() +} + +pub fn deserialize_vec_canonical<'de, D, T>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, + T: CanonicalDeserialize, +{ + let bytes_vecs: Vec> = Deserialize::deserialize(deserializer)?; + bytes_vecs + .iter() + .map(|bytes| { + let mut cursor = Cursor::new(bytes); + T::deserialize_compressed(&mut cursor).map_err(serde::de::Error::custom) + }) + .collect() +} diff --git a/nomos-da/kzgrs-backend/src/dispersal.rs b/nomos-da/kzgrs-backend/src/dispersal.rs index c3fc117d..37237509 100644 --- a/nomos-da/kzgrs-backend/src/dispersal.rs +++ b/nomos-da/kzgrs-backend/src/dispersal.rs @@ -8,22 +8,34 @@ use blst::BLST_ERROR; use kzgrs::{Commitment, KzgRsError}; use nomos_core::da::certificate::metadata::Next; use nomos_core::da::certificate::{self, metadata}; +use serde::{Deserialize, Serialize}; // internal use crate::common::{attestation::Attestation, build_attestation_message, NOMOS_DA_DST}; +use crate::common::{ + deserialize_canonical, deserialize_vec_canonical, serialize_canonical, serialize_vec_canonical, +}; use crate::encoder::EncodedData; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct Certificate { aggregated_signatures: Signature, signers: BitVec, + #[serde( + serialize_with = "serialize_canonical", + deserialize_with = "deserialize_canonical" + )] aggregated_column_commitment: Commitment, + #[serde( + serialize_with = "serialize_vec_canonical", + deserialize_with = "deserialize_vec_canonical" + )] row_commitments: Vec, metadata: Metadata, } impl Certificate { - pub fn id(&self) -> Vec { + pub fn id(&self) -> [u8; 32] { build_attestation_message(&self.aggregated_column_commitment, &self.row_commitments) } @@ -90,6 +102,12 @@ impl Certificate { } } +impl Hash for Certificate { + fn hash(&self, state: &mut H) { + state.write(::id(self).as_ref()); + } +} + fn aggregate_signatures(signatures: Vec) -> Result { let refs: Vec<&Signature> = signatures.iter().collect(); AggregateSignature::aggregate(&refs, true).map(|agg_sig| agg_sig.to_signature()) @@ -111,7 +129,7 @@ pub struct CertificateVerificationParameters { impl certificate::Certificate for Certificate { type Signature = Signature; - type Id = Vec; + type Id = [u8; 32]; type VerificationParameters = CertificateVerificationParameters; fn signers(&self) -> Vec { @@ -131,10 +149,10 @@ impl certificate::Certificate for Certificate { } } -#[derive(Copy, Clone, Default, Debug, Ord, PartialOrd, PartialEq, Eq)] +#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)] pub struct Index([u8; 8]); -#[derive(Default, Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Default, Debug, Copy, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Metadata { app_id: [u8; 32], index: Index, @@ -146,17 +164,17 @@ impl Metadata { } } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct VidCertificate { - id: Vec, + id: [u8; 32], metadata: Metadata, } impl certificate::vid::VidCertificate for VidCertificate { - type CertificateId = Vec; + type CertificateId = [u8; 32]; fn certificate_id(&self) -> Self::CertificateId { - self.id.clone() + self.id } fn size(&self) -> usize { diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index acab9f8c..883be1e6 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -9,6 +9,7 @@ axum = ["dep:axum", "dep:hyper", "dep:tower-http", "utoipa-swagger-ui/axum"] [dependencies] async-trait = "0.1" +bytes = "1.2" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" @@ -21,9 +22,12 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [ "openapi", ] } nomos-metrics = { path = "../../nomos-services/metrics" } +nomos-da-indexer = { path = "../data-availability/indexer", features = ["rocksdb-backend"] } +nomos-da-verifier = { path = "../data-availability/verifier", features = ["rocksdb-backend", "libp2p"] } nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } full-replication = { path = "../../nomos-da/full-replication" } +kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" } serde = { version = "1", features = ["derive"] } tokio = { version = "1.33", default-features = false, features = ["sync"] } diff --git a/nomos-services/api/src/http/consensus/cryptarchia.rs b/nomos-services/api/src/http/consensus/cryptarchia.rs index ba7a7cfe..b2929c6a 100644 --- a/nomos-services/api/src/http/consensus/cryptarchia.rs +++ b/nomos-services/api/src/http/consensus/cryptarchia.rs @@ -9,7 +9,7 @@ use cryptarchia_consensus::{ network::adapters::libp2p::LibP2pAdapter as ConsensusNetworkAdapter, ConsensusMsg, CryptarchiaConsensus, CryptarchiaInfo, }; -use full_replication::{Certificate, VidCertificate}; +use kzgrs_backend::dispersal::{Certificate, VidCertificate}; use nomos_core::{ da::certificate::{self, select::FillSize as FillSizeWithBlobsCertificate}, header::HeaderId, @@ -17,7 +17,7 @@ use nomos_core::{ }; use nomos_mempool::{ backend::mockpool::MockPool, - da::verify::fullreplication::DaVerificationProvider as MempoolVerificationProvider, + da::verify::kzgrs::DaVerificationProvider as MempoolVerificationProvider, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, }; use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde}; diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs new file mode 100644 index 00000000..bce1d039 --- /dev/null +++ b/nomos-services/api/src/http/da.rs @@ -0,0 +1,142 @@ +use bytes::Bytes; +use core::ops::Range; +use nomos_core::da::attestation::Attestation; +use nomos_core::da::blob::Blob; +use nomos_core::da::certificate::metadata::Metadata; +use nomos_core::da::certificate::{self, select::FillSize as FillSizeWithBlobsCertificate}; +use nomos_core::da::DaVerifier as CoreDaVerifier; +use nomos_core::header::HeaderId; +use nomos_core::tx::{select::FillSize as FillSizeWithTx, Transaction}; +use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter; +use nomos_da_indexer::DaMsg; +use nomos_da_indexer::{ + consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter, DataIndexerService, +}; +use nomos_da_verifier::backend::VerifierBackend; +use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter; +use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; +use nomos_da_verifier::{DaVerifierMsg, DaVerifierService}; +use nomos_mempool::backend::mockpool::MockPool; +use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; +use nomos_mempool::verify::MempoolVerificationProvider; +use nomos_storage::backends::rocksdb::RocksBackend; +use nomos_storage::backends::StorageSerde; +use overwatch_rs::overwatch::handle::OverwatchHandle; +use overwatch_rs::DynError; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::error::Error; +use std::fmt::Debug; +use std::hash::Hash; +use tokio::sync::oneshot; + +pub type DaIndexer = DataIndexerService< + // Indexer specific. + Bytes, + IndexerStorageAdapter, + CryptarchiaConsensusAdapter, + // Cryptarchia specific, should be the same as in `Cryptarchia` type above. + cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, + MockPool::Hash>, + MempoolNetworkAdapter::Hash>, + MockPool, + MempoolNetworkAdapter::Id>, + VP, + FillSizeWithTx, + FillSizeWithBlobsCertificate, + RocksBackend, +>; + +pub type DaVerifier = + DaVerifierService, VerifierStorageAdapter>; + +pub async fn add_blob( + handle: &OverwatchHandle, + blob: B, +) -> Result, DynError> +where + A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, + ::BlobId: AsRef<[u8]> + Send + Sync + 'static, + VB: VerifierBackend + CoreDaVerifier, + ::Settings: Clone, + ::Error: Error, + SS: StorageSerde + Send + Sync + 'static, +{ + let relay = handle.relay::>().connect().await?; + let (sender, receiver) = oneshot::channel(); + relay + .send(DaVerifierMsg::AddBlob { + blob, + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + Ok(receiver.await?) +} + +pub async fn get_range( + handle: &OverwatchHandle, + app_id: ::AppId, + range: Range<::Index>, +) -> Result::Index, Option)>, DynError> +where + Tx: Transaction + + Eq + + Clone + + Debug + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, + C: certificate::Certificate + + Clone + + Debug + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::Id: Clone + Send + Sync, + V: certificate::vid::VidCertificate + + From + + Eq + + Debug + + Metadata + + Hash + + Clone + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ::CertificateId: Debug + Clone + Ord + Hash, + ::AppId: AsRef<[u8]> + Serialize + Clone + Send + Sync, + ::Index: + AsRef<[u8]> + Serialize + DeserializeOwned + Clone + PartialOrd + Send + Sync, + VP: MempoolVerificationProvider< + Payload = C, + Parameters = ::VerificationParameters, + >, + SS: StorageSerde + Send + Sync + 'static, +{ + let relay = handle + .relay::>() + .connect() + .await?; + let (sender, receiver) = oneshot::channel(); + relay + .send(DaMsg::GetRange { + app_id, + range, + reply_channel: sender, + }) + .await + .map_err(|(e, _)| e)?; + + Ok(receiver.await?) +} diff --git a/nomos-services/api/src/http/mod.rs b/nomos-services/api/src/http/mod.rs index d63a1021..6ab13b2a 100644 --- a/nomos-services/api/src/http/mod.rs +++ b/nomos-services/api/src/http/mod.rs @@ -1,6 +1,7 @@ pub type DynError = Box; pub mod cl; pub mod consensus; +pub mod da; pub mod libp2p; pub mod mempool; pub mod metrics; diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index 16dcacfb..73fd1915 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -14,6 +14,7 @@ nomos-metrics = { path = "../../nomos-services/metrics" } nomos-network = { path = "../network" } nomos-core = { path = "../../nomos-core" } full-replication = { path = "../../nomos-da/full-replication" } +kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } rand = { version = "0.8", optional = true } serde = { version = "1.0", features = ["derive"] } diff --git a/nomos-services/mempool/src/da/verify/kzgrs.rs b/nomos-services/mempool/src/da/verify/kzgrs.rs new file mode 100644 index 00000000..b7336ada --- /dev/null +++ b/nomos-services/mempool/src/da/verify/kzgrs.rs @@ -0,0 +1,22 @@ +use kzgrs_backend::dispersal::{Certificate, CertificateVerificationParameters}; + +use crate::verify::MempoolVerificationProvider; + +pub struct DaVerificationProvider { + settings: CertificateVerificationParameters, +} + +#[async_trait::async_trait] +impl MempoolVerificationProvider for DaVerificationProvider { + type Payload = Certificate; + type Parameters = CertificateVerificationParameters; + type Settings = CertificateVerificationParameters; + + fn new(settings: Self::Settings) -> Self { + Self { settings } + } + + async fn get_parameters(&self, _: &Self::Payload) -> Self::Parameters { + self.settings.clone() + } +} diff --git a/nomos-services/mempool/src/da/verify/mod.rs b/nomos-services/mempool/src/da/verify/mod.rs index fc98c3dc..59ac68a1 100644 --- a/nomos-services/mempool/src/da/verify/mod.rs +++ b/nomos-services/mempool/src/da/verify/mod.rs @@ -1 +1,2 @@ pub mod fullreplication; +pub mod kzgrs; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 88bb4a87..4968d630 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -17,6 +17,7 @@ cryptarchia-engine = { path = "../consensus/cryptarchia-engine", features = ["se cryptarchia-ledger = { path = "../ledger/cryptarchia-ledger", features = ["serde"] } nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] } full-replication = { path = "../nomos-da/full-replication" } +kzgrs-backend = { path = "../nomos-da/kzgrs-backend" } rand = "0.8" once_cell = "1" secp256k1 = { version = "0.26", features = ["rand"] } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 06816afd..47e17777 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -7,7 +7,7 @@ use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node}; use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig}; use cryptarchia_ledger::{Coin, LedgerState}; -use full_replication::Certificate; +use kzgrs_backend::dispersal::Certificate; #[cfg(feature = "mixnet")] use mixnet::{ address::NodeAddress,