DA: DispersedBlobInfo trait in nomos-core and kzgrs-backend (#686)

* DispersedBlobData in nomos-core

* Implement BlobInfo in kzgrs-backend

* Fullreplication BlobInfo

* Use block info in da mempool

* Blob info in consensus

* Blob info in da-verifier

* BlobInfo in da indexer

* BlobInfo in nomos-api

* Verifier and Indexer integration tests with BlobInfo

* Blob info in nomos node

* Import DaEncoder trait
This commit is contained in:
gusto 2024-08-02 20:01:18 +03:00 committed by GitHub
parent e380bf4117
commit 9ad4ee7a3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 327 additions and 1326 deletions

View File

@ -16,15 +16,11 @@ use nomos_api::{
http::{cl, consensus, da, libp2p, mempool, metrics, storage},
Backend,
};
use nomos_core::da::certificate::metadata::Metadata;
use nomos_core::da::{certificate, DaVerifier as CoreDaVerifier};
use nomos_core::{
da::{attestation::Attestation, blob::Blob},
header::HeaderId,
tx::Transaction,
};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
use nomos_da_verifier::backend::VerifierBackend;
use nomos_mempool::verify::MempoolVerificationProvider;
use nomos_mempool::{
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
tx::service::openapi::Status, MempoolMetrics,
@ -49,13 +45,12 @@ pub struct AxumBackendSettings {
pub cors_origins: Vec<String>,
}
pub struct AxumBackend<A, B, C, V, VP, VB, T, S, const SIZE: usize> {
pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<A>,
_blob: core::marker::PhantomData<B>,
_certificate: core::marker::PhantomData<C>,
_vid: core::marker::PhantomData<V>,
_params_provider: core::marker::PhantomData<VP>,
_verifier_backend: core::marker::PhantomData<VB>,
_tx: core::marker::PhantomData<T>,
_storage_serde: core::marker::PhantomData<S>,
@ -75,13 +70,12 @@ pub struct AxumBackend<A, B, C, V, VP, VB, T, S, const SIZE: usize> {
struct ApiDoc;
#[async_trait::async_trait]
impl<A, B, C, V, VP, VB, T, S, const SIZE: usize> Backend
for AxumBackend<A, B, C, V, VP, VB, T, S, SIZE>
impl<A, B, C, V, VB, T, S, const SIZE: usize> Backend for AxumBackend<A, B, C, V, VB, T, S, SIZE>
where
A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
C: certificate::Certificate<Id = [u8; 32]>
C: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
@ -89,8 +83,8 @@ where
+ Send
+ Sync
+ 'static,
<C as certificate::Certificate>::Id: Clone + Send + Sync,
V: certificate::vid::VidCertificate<CertificateId = [u8; 32]>
<C as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
V: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<C>
+ Eq
+ Debug
@ -102,17 +96,11 @@ where
+ Send
+ Sync
+ 'static,
<V as certificate::vid::VidCertificate>::CertificateId: Debug + Clone + Ord + Hash,
<V as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<V as Metadata>::AppId: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
<V as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
VP: MempoolVerificationProvider<
Payload = C,
Parameters = <C as certificate::Certificate>::VerificationParameters,
> + Send
+ Sync
+ 'static,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B, Attestation = A> + Send + Sync + 'static,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B> + Send + Sync + 'static,
<VB as VerifierBackend>::Settings: Clone,
<VB as CoreDaVerifier>::Error: Error,
T: Transaction
@ -142,7 +130,6 @@ where
_blob: core::marker::PhantomData,
_certificate: core::marker::PhantomData,
_vid: core::marker::PhantomData,
_params_provider: core::marker::PhantomData,
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
@ -185,7 +172,7 @@ where
.route("/da/add_blob", routing::post(add_blob::<A, B, VB, S>))
.route(
"/da/get_range",
routing::post(get_range::<T, C, V, VP, S, SIZE>),
routing::post(get_range::<T, C, V, S, SIZE>),
)
.route("/network/info", routing::get(libp2p_info))
.route("/storage/block", routing::post(block::<S, T>))
@ -337,10 +324,10 @@ async fn add_blob<A, B, VB, SS>(
Json(blob): Json<B>,
) -> Response
where
A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B, Attestation = A>,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B>,
<VB as VerifierBackend>::Settings: Clone,
<VB as CoreDaVerifier>::Error: Error,
SS: StorageSerde + Send + Sync + 'static,
@ -366,7 +353,7 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn get_range<Tx, C, V, VP, SS, const SIZE: usize>(
async fn get_range<Tx, C, V, SS, const SIZE: usize>(
State(handle): State<OverwatchHandle>,
Json(GetRangeReq { app_id, range }): Json<GetRangeReq<V>>,
) -> Response
@ -382,7 +369,7 @@ where
+ Sync
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
C: certificate::Certificate<Id = [u8; 32]>
C: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
@ -390,8 +377,8 @@ where
+ Send
+ Sync
+ 'static,
<C as certificate::Certificate>::Id: Clone + Send + Sync,
V: certificate::vid::VidCertificate<CertificateId = [u8; 32]>
<C as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
V: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<C>
+ Eq
+ Debug
@ -403,19 +390,13 @@ where
+ Send
+ Sync
+ 'static,
<V as certificate::vid::VidCertificate>::CertificateId: Debug + Clone + Ord + Hash,
<V as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<V as Metadata>::AppId: AsRef<[u8]> + Clone + Serialize + DeserializeOwned + Send + Sync,
<V as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
VP: MempoolVerificationProvider<
Payload = C,
Parameters = <C as certificate::Certificate>::VerificationParameters,
>,
SS: StorageSerde + Send + Sync + 'static,
{
make_request_and_return_response!(da::get_range::<Tx, C, V, VP, SS, SIZE>(
&handle, app_id, range
))
make_request_and_return_response!(da::get_range::<Tx, C, V, SS, SIZE>(&handle, app_id, range))
}
#[utoipa::path(
@ -434,7 +415,7 @@ async fn libp2p_info(State(handle): State<OverwatchHandle>) -> Response {
get,
path = "/storage/block",
responses(
(status = 200, description = "Get the block by block id", body = Block<Tx, kzgrs_backend::dispersal::Certificate>),
(status = 200, description = "Get the block by block id", body = Block<Tx, kzgrs_backend::dispersal::BlobInfo>),
(status = 500, description = "Internal server error", body = String),
)
)]

View File

@ -1,25 +1,23 @@
use bytes::Bytes;
use color_eyre::eyre::Result;
use kzgrs_backend::dispersal::BlobInfo;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
use serde::{de::DeserializeOwned, Serialize};
use api::AxumBackend;
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
use kzgrs_backend::common::attestation::Attestation;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::dispersal::{Certificate, VidCertificate};
use nomos_api::ApiService;
use nomos_core::{da::certificate, header::HeaderId, tx::Transaction, wire};
use nomos_core::da::blob::info::DispersedBlobInfo;
pub use nomos_core::{
da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
tx::select::FillSize as FillSizeWithTx,
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
};
use nomos_core::{header::HeaderId, tx::Transaction, wire};
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
#[cfg(feature = "tracing")]
use nomos_log::Logger;
use nomos_mempool::da::service::DaMempoolService;
use nomos_mempool::da::verify::kzgrs::DaVerificationProvider as MempoolVerificationProvider;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
#[cfg(feature = "metrics")]
@ -37,37 +35,21 @@ pub mod api;
mod config;
mod tx;
pub type NomosApiService = ApiService<
AxumBackend<
Attestation,
DaBlob,
Certificate,
VidCertificate,
MempoolVerificationProvider,
KzgrsDaVerifier,
Tx,
Wire,
MB16,
>,
>;
pub type NomosApiService =
ApiService<AxumBackend<(), DaBlob, BlobInfo, BlobInfo, KzgrsDaVerifier, Tx, Wire, MB16>>;
pub const CL_TOPIC: &str = "cl";
pub const DA_TOPIC: &str = "da";
const MB16: usize = 1024 * 1024 * 16;
pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, VidCertificate>,
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolVerificationProvider,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, VidCertificate>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
>;
@ -77,13 +59,8 @@ pub type TxMempool = TxMempoolService<
>;
pub type DaMempool = DaMempoolService<
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolVerificationProvider,
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
>;
#[derive(Services)]

View File

@ -1,4 +1,4 @@
use kzgrs_backend::dispersal::Certificate;
use kzgrs_backend::dispersal::BlobInfo;
#[cfg(feature = "metrics")]
use nomos_metrics::MetricsSettings;
use nomos_node::{
@ -8,7 +8,7 @@ use nomos_node::{
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_core::{da::certificate, tx::Transaction};
use nomos_core::{da::blob::info::DispersedBlobInfo, tx::Transaction};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
@ -86,12 +86,8 @@ fn main() -> Result<()> {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: <Certificate as certificate::Certificate>::id,
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
verification_provider:
kzgrs_backend::dispersal::CertificateVerificationParameters {
nodes_public_keys: Default::default(),
},
registry: registry.clone(),
},
cryptarchia: config.cryptarchia,

View File

@ -1,5 +1,5 @@
use super::CLIENT;
use full_replication::Certificate;
use full_replication::BlobInfo;
use nomos_core::block::Block;
use nomos_core::header::HeaderId;
use nomos_node::Tx;
@ -8,7 +8,7 @@ use reqwest::Url;
pub async fn get_block_contents(
node: &Url,
block: &HeaderId,
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
) -> Result<Option<Block<Tx, BlobInfo>>, reqwest::Error> {
const BLOCK_PATH: &str = "storage/block";
CLIENT
.post(node.join(BLOCK_PATH).unwrap())

View File

@ -7,8 +7,7 @@ use serde::Serialize;
// internal
use crate::block::Block;
use crate::crypto::Blake2b;
use crate::da::certificate::vid::VidCertificate;
use crate::da::certificate::BlobCertificateSelect;
use crate::da::blob::{info::DispersedBlobInfo, BlobSelect};
use crate::header::{
carnot::Builder as CarnotBuilder, cryptarchia::Builder as CryptarchiaBuilder, Header, HeaderId,
};
@ -61,12 +60,12 @@ where
}
}
impl<Tx, C, TxSelector, BlobSelector> BlockBuilder<Tx, C, TxSelector, BlobSelector>
impl<Tx, B, TxSelector, BlobSelector> BlockBuilder<Tx, B, TxSelector, BlobSelector>
where
Tx: Transaction + Clone + Eq + Hash + Serialize + DeserializeOwned,
C: VidCertificate + Clone + Eq + Hash + Serialize + DeserializeOwned,
B: DispersedBlobInfo + Clone + Eq + Hash + Serialize + DeserializeOwned,
TxSelector: TxSelect<Tx = Tx>,
BlobSelector: BlobCertificateSelect<Certificate = C>,
BlobSelector: BlobSelect<BlobId = B>,
{
pub fn new(tx_selector: TxSelector, blob_selector: BlobSelector) -> Self {
Self {
@ -103,14 +102,14 @@ where
#[must_use]
pub fn with_blobs_certificates(
mut self,
blobs_certificates: impl Iterator<Item = C> + 'static,
blobs_certificates: impl Iterator<Item = B> + 'static,
) -> Self {
self.blobs = Some(Box::new(blobs_certificates));
self
}
#[allow(clippy::result_large_err)]
pub fn build(self) -> Result<Block<Tx, C>, String> {
pub fn build(self) -> Result<Block<Tx, B>, String> {
if let Self {
tx_selector,
blob_selector,

View File

@ -1,9 +0,0 @@
use std::hash::Hash;
pub trait Attestation {
type Hash: Hash + Eq + Clone;
fn blob_hash(&self) -> Self::Hash;
fn hash(&self) -> Self::Hash;
fn signature(&self) -> &[u8];
}

View File

@ -0,0 +1,8 @@
use super::metadata::Metadata;
pub trait DispersedBlobInfo: Metadata {
type BlobId;
fn blob_id(&self) -> Self::BlobId;
fn size(&self) -> usize;
}

View File

@ -1,5 +1,20 @@
pub mod info;
pub mod metadata;
pub mod select;
pub trait Blob {
type BlobId;
fn id(&self) -> Self::BlobId;
}
pub trait BlobSelect {
type BlobId: info::DispersedBlobInfo;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
fn select_blob_from<'i, I: Iterator<Item = Self::BlobId> + 'i>(
&self,
certificates: I,
) -> impl Iterator<Item = Self::BlobId> + 'i;
}

View File

@ -3,7 +3,7 @@ use std::marker::PhantomData;
// crates
// internal
use crate::da::certificate::{vid::VidCertificate, BlobCertificateSelect};
use crate::da::blob::{info::DispersedBlobInfo, BlobSelect};
use crate::utils;
#[derive(Default, Clone, Copy)]
@ -19,8 +19,8 @@ impl<const SIZE: usize, B> FillSize<SIZE, B> {
}
}
impl<const SIZE: usize, C: VidCertificate> BlobCertificateSelect for FillSize<SIZE, C> {
type Certificate = C;
impl<const SIZE: usize, B: DispersedBlobInfo> BlobSelect for FillSize<SIZE, B> {
type BlobId = B;
type Settings = ();
@ -28,13 +28,10 @@ impl<const SIZE: usize, C: VidCertificate> BlobCertificateSelect for FillSize<SI
FillSize::new()
}
fn select_blob_from<'i, I: Iterator<Item = Self::Certificate> + 'i>(
fn select_blob_from<'i, I: Iterator<Item = Self::BlobId> + 'i>(
&self,
certificates: I,
) -> impl Iterator<Item = Self::Certificate> + 'i {
utils::select::select_from_till_fill_size::<SIZE, Self::Certificate>(
|c| c.size(),
certificates,
)
) -> impl Iterator<Item = Self::BlobId> + 'i {
utils::select::select_from_till_fill_size::<SIZE, Self::BlobId>(|c| c.size(), certificates)
}
}

View File

@ -1,63 +0,0 @@
use crate::da::{attestation::Attestation, certificate::Certificate};
#[derive(Clone, Debug, PartialEq)]
pub struct MockAttestation {
voter: [u8; 32],
signature: Vec<u8>,
}
impl MockAttestation {
pub fn new(voter: &[u8; 32], signature: &[u8]) -> Self {
MockAttestation {
voter: *voter,
signature: signature.to_vec(),
}
}
}
impl Attestation for MockAttestation {
type Hash = Vec<u8>;
fn blob_hash(&self) -> Self::Hash {
unimplemented!()
}
fn hash(&self) -> Self::Hash {
vec![0u8]
}
fn signature(&self) -> &[u8] {
&self.signature
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct MockCertificate {
attestations: Vec<MockAttestation>,
}
impl MockCertificate {
pub fn new(attestations: Vec<MockAttestation>) -> Self {
MockCertificate { attestations }
}
}
impl Certificate for MockCertificate {
type Signature = [u8; 32];
type Id = [u8; 32];
type VerificationParameters = ();
fn signers(&self) -> Vec<bool> {
todo!()
}
fn signature(&self) -> Self::Signature {
todo!()
}
fn id(&self) -> Self::Id {
todo!()
}
fn verify(&self, _: Self::VerificationParameters) -> bool {
todo!()
}
}

View File

@ -1,40 +0,0 @@
pub mod metadata;
pub mod mock;
pub mod select;
pub mod vid;
pub trait Certificate {
type Signature;
type Id;
type VerificationParameters;
fn signers(&self) -> Vec<bool>;
fn signature(&self) -> Self::Signature;
fn id(&self) -> Self::Id;
fn verify(&self, verification_params: Self::VerificationParameters) -> bool;
}
pub trait BlobCertificateSelect {
type Certificate: vid::VidCertificate;
type Settings: Clone;
fn new(settings: Self::Settings) -> Self;
fn select_blob_from<'i, I: Iterator<Item = Self::Certificate> + 'i>(
&self,
certificates: I,
) -> impl Iterator<Item = Self::Certificate> + 'i;
}
pub trait CertificateStrategy {
type Attestation;
type Certificate;
type Metadata: metadata::Metadata;
fn can_build(&self, attestations: &[Self::Attestation]) -> bool;
fn build(
&self,
attestations: Vec<Self::Attestation>,
app_id: <Self::Metadata as metadata::Metadata>::AppId,
index: <Self::Metadata as metadata::Metadata>::Index,
) -> Self::Certificate;
}

View File

@ -1,8 +0,0 @@
use super::metadata::Metadata;
pub trait VidCertificate: Metadata {
type CertificateId;
fn certificate_id(&self) -> Self::CertificateId;
fn size(&self) -> usize;
}

View File

@ -1,29 +1,27 @@
use std::error::Error;
// crates
// internal
pub mod attestation;
pub mod blob;
pub mod certificate;
pub trait DaEncoder {
type EncodedData;
fn encode(b: &[u8]) -> Result<Self::EncodedData, impl Error>;
type Error;
fn encode(&self, b: &[u8]) -> Result<Self::EncodedData, Self::Error>;
}
pub trait DaVerifier {
type DaBlob;
type Attestation;
type Error;
fn verify(&self, blob: &Self::DaBlob) -> Result<Self::Attestation, Self::Error>;
fn verify(&self, blob: &Self::DaBlob) -> Result<(), Self::Error>;
}
pub trait DaDispersal {
type EncodedData;
type Certificate;
type Error;
fn disperse(&self, encoded_data: Self::EncodedData) -> Result<Self::Certificate, impl Error>;
fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error>;
}
pub trait Signer {

View File

@ -1,40 +0,0 @@
use nomos_core::da::{attestation, Signer};
use serde::{Deserialize, Serialize};
use crate::{hash, Voter};
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, PartialOrd, Ord)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Attestation {
blob_hash: [u8; 32],
attester: Voter,
sig: Vec<u8>,
}
impl Attestation {
pub fn new_signed<S: Signer>(blob_hash: [u8; 32], attester: Voter, key_pair: &S) -> Self {
let attestation_hash = hash([blob_hash, attester].concat());
let sig = key_pair.sign(&attestation_hash);
Self {
blob_hash,
attester,
sig,
}
}
}
impl attestation::Attestation for Attestation {
type Hash = [u8; 32];
fn blob_hash(&self) -> Self::Hash {
self.blob_hash
}
fn hash(&self) -> Self::Hash {
hash([self.blob_hash, self.attester].concat())
}
fn signature(&self) -> &[u8] {
self.sig.as_ref()
}
}

View File

@ -1,83 +1,21 @@
pub mod attestation;
use attestation::Attestation;
use nomos_core::da::attestation::Attestation as _;
use nomos_core::da::certificate::metadata::Next;
use nomos_core::da::certificate::CertificateStrategy;
// internal
use nomos_core::da::certificate::{self, metadata};
// std
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
// crates
use blake2::{
digest::{Update, VariableOutput},
Blake2bVar,
};
use bytes::Bytes;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::{self, Next};
use serde::{Deserialize, Serialize};
// internal
#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
pub struct Index([u8; 8]);
/// Re-export the types for OpenAPI
#[cfg(feature = "openapi")]
pub mod openapi {
pub use super::Certificate;
}
#[derive(Debug, Clone)]
pub struct AbsoluteNumber<A, C> {
num_attestations: usize,
_a: std::marker::PhantomData<A>,
_c: std::marker::PhantomData<C>,
}
impl<A, C> AbsoluteNumber<A, C> {
pub fn new(num_attestations: usize) -> Self {
Self {
num_attestations,
_a: std::marker::PhantomData,
_c: std::marker::PhantomData,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Settings {
pub voter: Voter,
pub num_attestations: usize,
}
impl CertificateStrategy for AbsoluteNumber<Attestation, Certificate> {
type Attestation = Attestation;
type Certificate = Certificate;
type Metadata = Certificate;
fn can_build(&self, attestations: &[Self::Attestation]) -> bool {
attestations.len() >= self.num_attestations
&& attestations
.iter()
.map(|a| a.blob_hash())
.collect::<HashSet<_>>()
.len()
== 1
}
fn build(
&self,
attestations: Vec<Self::Attestation>,
app_id: [u8; 32],
index: Index,
) -> Certificate {
assert!(self.can_build(&attestations));
Certificate {
attestations,
metadata: Metadata { app_id, index },
}
}
}
pub type Voter = [u8; 32];
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
@ -93,6 +31,10 @@ pub struct Metadata {
}
impl Metadata {
pub fn new(app_id: [u8; 32], index: Index) -> Self {
Self { app_id, index }
}
fn size(&self) -> usize {
std::mem::size_of_val(&self.app_id) + std::mem::size_of_val(&self.index)
}
@ -100,67 +42,27 @@ impl Metadata {
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Certificate {
attestations: Vec<Attestation>,
metadata: Metadata,
}
impl Hash for Certificate {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(<Certificate as certificate::Certificate>::id(self).as_ref());
}
}
#[derive(Clone, Debug)]
pub struct CertificateVerificationParameters {
pub threshold: usize,
}
impl certificate::Certificate for Certificate {
type Id = [u8; 32];
type Signature = [u8; 32];
type VerificationParameters = CertificateVerificationParameters;
fn signature(&self) -> Self::Signature {
let mut attestations = self.attestations.clone();
attestations.sort();
let mut signatures = Vec::new();
for attestation in &attestations {
signatures.extend_from_slice(attestation.signature());
}
hash(signatures)
}
fn id(&self) -> Self::Id {
let mut input = self
.attestations
.iter()
.map(|a| a.signature())
.collect::<Vec<_>>();
// sort to make the hash deterministic
input.sort();
hash(input.concat())
}
fn signers(&self) -> Vec<bool> {
unimplemented!()
}
fn verify(&self, params: Self::VerificationParameters) -> bool {
self.attestations.len() >= params.threshold
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct VidCertificate {
pub struct BlobInfo {
id: [u8; 32],
metadata: Metadata,
}
impl certificate::vid::VidCertificate for VidCertificate {
type CertificateId = [u8; 32];
impl BlobInfo {
pub fn new(id: [u8; 32], metadata: Metadata) -> Self {
Self { id, metadata }
}
}
fn certificate_id(&self) -> Self::CertificateId {
impl Hash for BlobInfo {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(<BlobInfo as DispersedBlobInfo>::blob_id(self).as_ref());
}
}
impl DispersedBlobInfo for BlobInfo {
type BlobId = [u8; 32];
fn blob_id(&self) -> Self::BlobId {
self.id
}
@ -169,38 +71,7 @@ impl certificate::vid::VidCertificate for VidCertificate {
}
}
impl metadata::Metadata for VidCertificate {
type AppId = [u8; 32];
type Index = Index;
fn metadata(&self) -> (Self::AppId, Self::Index) {
(self.metadata.app_id, self.metadata.index)
}
}
impl Hash for VidCertificate {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(
<VidCertificate as certificate::vid::VidCertificate>::certificate_id(self).as_ref(),
);
}
}
impl From<Certificate> for VidCertificate {
fn from(cert: Certificate) -> Self {
// To simulate the propery of aggregate committment + row commitment in Nomos Da Protocol,
// when full replication certificate is converted into the VID (which should happen after
// the verification in the mempool) the id is set to the blob hash to allow identification
// of the distributed data accross nomos nodes.
let id = cert.attestations[0].blob_hash();
Self {
id,
metadata: cert.metadata,
}
}
}
impl metadata::Metadata for Certificate {
impl metadata::Metadata for BlobInfo {
type AppId = [u8; 32];
type Index = Index;
@ -228,11 +99,3 @@ impl AsRef<[u8]> for Index {
self.0.as_ref()
}
}
fn hash(item: impl AsRef<[u8]>) -> [u8; 32] {
let mut hasher = Blake2bVar::new(32).unwrap();
hasher.update(item.as_ref());
let mut output = [0; 32];
hasher.finalize_variable(&mut output).unwrap();
output
}

View File

@ -26,5 +26,5 @@ fn encode<const SIZE: usize>(bencher: Bencher, column_size: usize) {
)
})
.input_counter(|(_, buff)| BytesCount::new(buff.len()))
.bench_refs(|(encoder, buff)| black_box(encoder.encode(buff)));
.bench_refs(|(encoder, buff)| black_box(nomos_core::da::DaEncoder::encode(encoder, buff)));
}

View File

@ -4,6 +4,7 @@ use divan::Bencher;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::encoder::{DaEncoder, DaEncoderParams};
use kzgrs_backend::verifier::DaVerifier;
use nomos_core::da::DaEncoder as _;
use rand::{thread_rng, RngCore};
use std::hint::black_box;

View File

@ -1,32 +0,0 @@
// std
// crates
use blake2::Blake2b;
use nomos_core::da::attestation;
use serde::{Deserialize, Serialize};
use sha3::Digest;
// internal
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Attestation {
pub blob_hash: [u8; 32],
pub signature: Vec<u8>,
}
impl attestation::Attestation for Attestation {
type Hash = [u8; 32];
fn blob_hash(&self) -> Self::Hash {
self.blob_hash
}
fn hash(&self) -> Self::Hash {
Blake2b::new()
.chain_update(self.blob_hash)
.finalize()
.into()
}
fn signature(&self) -> &[u8] {
self.signature.as_ref()
}
}

View File

@ -1,4 +1,3 @@
pub mod attestation;
pub mod blob;
// std

View File

@ -1,151 +1,40 @@
// std
use std::hash::{Hash, Hasher};
// crates
use bitvec::prelude::*;
use blst::min_sig::{AggregateSignature, PublicKey, Signature};
use blst::BLST_ERROR;
use kzgrs::{Commitment, KzgRsError};
use nomos_core::da::certificate::metadata::Next;
use nomos_core::da::certificate::{self, metadata};
use nomos_core::da::blob::{self, metadata::Next};
use serde::{Deserialize, Serialize};
// internal
use crate::common::{attestation::Attestation, build_attestation_message, NOMOS_DA_DST};
use crate::common::{
deserialize_canonical, deserialize_vec_canonical, serialize_canonical, serialize_vec_canonical,
};
use crate::encoder::EncodedData;
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Certificate {
aggregated_signatures: Signature,
signers: BitVec<u8>,
#[serde(
serialize_with = "serialize_canonical",
deserialize_with = "deserialize_canonical"
)]
aggregated_column_commitment: Commitment,
#[serde(
serialize_with = "serialize_vec_canonical",
deserialize_with = "deserialize_vec_canonical"
)]
row_commitments: Vec<Commitment>,
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct BlobInfo {
id: [u8; 32],
metadata: Metadata,
}
impl Certificate {
pub fn id(&self) -> [u8; 32] {
build_attestation_message(&self.aggregated_column_commitment, &self.row_commitments)
impl blob::info::DispersedBlobInfo for BlobInfo {
type BlobId = [u8; 32];
fn blob_id(&self) -> Self::BlobId {
self.id
}
pub fn verify(&self, nodes_public_keys: &[PublicKey]) -> bool {
let signers_keys: Vec<&PublicKey> = nodes_public_keys
.iter()
.enumerate()
.filter(|(index, _)| self.signers[*index])
.map(|(_, pk)| pk)
.collect();
let message = self.id();
let messages: Vec<&[u8]> = std::iter::repeat(message.as_slice())
.take(signers_keys.len())
.collect();
verify_aggregate_signature(&self.aggregated_signatures, &signers_keys, &messages)
}
pub fn build_certificate(
encoded_data: &EncodedData,
attestations: &[Attestation],
signers: BitVec<u8>,
threshold: usize,
metadata: Metadata,
) -> Result<Self, KzgRsError> {
if attestations.len() < threshold {
return Err(KzgRsError::NotEnoughAttestations {
required: threshold,
received: attestations.len(),
});
}
if attestations.len() != signers.count_ones() {
return Err(KzgRsError::AttestationSignersMismatch {
attestations_count: attestations.len(),
signers_count: signers.count_ones(),
});
}
let signatures: Vec<Signature> = attestations
.iter()
.filter_map(|att| Signature::from_bytes(&att.signature).ok())
.collect();
// Certificate will fail to be built if number of valid signatures from the attestations
// doesn't satisfy the same threshold used for attestations.
if signatures.len() < threshold {
return Err(KzgRsError::NotEnoughAttestations {
required: threshold,
received: signatures.len(),
});
}
let aggregated_signatures = aggregate_signatures(signatures)?;
Ok(Self {
aggregated_signatures,
signers,
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
row_commitments: encoded_data.row_commitments.clone(),
metadata,
})
fn size(&self) -> usize {
std::mem::size_of_val(&self.id) + self.metadata.size()
}
}
impl Hash for Certificate {
impl Hash for BlobInfo {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(<Certificate as certificate::Certificate>::id(self).as_ref());
state.write(<BlobInfo as blob::info::DispersedBlobInfo>::blob_id(self).as_ref());
}
}
fn aggregate_signatures(signatures: Vec<Signature>) -> Result<Signature, BLST_ERROR> {
let refs: Vec<&Signature> = signatures.iter().collect();
AggregateSignature::aggregate(&refs, true).map(|agg_sig| agg_sig.to_signature())
}
impl blob::metadata::Metadata for BlobInfo {
type AppId = [u8; 32];
type Index = Index;
fn verify_aggregate_signature(
aggregate_signature: &Signature,
public_keys: &[&PublicKey],
messages: &[&[u8]],
) -> bool {
BLST_ERROR::BLST_SUCCESS
== aggregate_signature.aggregate_verify(true, messages, NOMOS_DA_DST, public_keys, true)
}
#[derive(Clone, Debug)]
pub struct CertificateVerificationParameters {
pub nodes_public_keys: Vec<PublicKey>,
}
impl certificate::Certificate for Certificate {
type Signature = Signature;
type Id = [u8; 32];
type VerificationParameters = CertificateVerificationParameters;
fn signers(&self) -> Vec<bool> {
self.signers.iter().map(|b| *b).collect()
}
fn signature(&self) -> Self::Signature {
self.aggregated_signatures
}
fn id(&self) -> Self::Id {
build_attestation_message(&self.aggregated_column_commitment, &self.row_commitments)
}
fn verify(&self, params: Self::VerificationParameters) -> bool {
self.verify(&params.nodes_public_keys)
fn metadata(&self) -> (Self::AppId, Self::Index) {
(self.metadata.app_id, self.metadata.index)
}
}
@ -164,59 +53,6 @@ impl Metadata {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct VidCertificate {
id: [u8; 32],
metadata: Metadata,
}
impl certificate::vid::VidCertificate for VidCertificate {
type CertificateId = [u8; 32];
fn certificate_id(&self) -> Self::CertificateId {
self.id
}
fn size(&self) -> usize {
std::mem::size_of_val(&self.id) + self.metadata.size()
}
}
impl metadata::Metadata for VidCertificate {
type AppId = [u8; 32];
type Index = Index;
fn metadata(&self) -> (Self::AppId, Self::Index) {
(self.metadata.app_id, self.metadata.index)
}
}
impl Hash for VidCertificate {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(
<VidCertificate as certificate::vid::VidCertificate>::certificate_id(self).as_ref(),
);
}
}
impl From<Certificate> for VidCertificate {
fn from(cert: Certificate) -> Self {
Self {
id: cert.id(),
metadata: cert.metadata,
}
}
}
impl metadata::Metadata for Certificate {
type AppId = [u8; 32];
type Index = Index;
fn metadata(&self) -> (Self::AppId, Self::Index) {
(self.metadata.app_id, self.metadata.index)
}
}
impl From<u64> for Index {
fn from(value: u64) -> Self {
Self(value.to_be_bytes())
@ -239,13 +75,12 @@ impl AsRef<[u8]> for Index {
#[cfg(test)]
mod tests {
use bitvec::prelude::*;
use blst::min_sig::{PublicKey, SecretKey};
use rand::{rngs::OsRng, thread_rng, Rng, RngCore};
use blst::min_sig::SecretKey;
use nomos_core::da::DaEncoder as _;
use rand::{thread_rng, RngCore};
use crate::{
common::{attestation::Attestation, blob::DaBlob, NOMOS_DA_DST},
dispersal::{aggregate_signatures, verify_aggregate_signature, Metadata},
common::blob::DaBlob,
encoder::{
test::{rand_data, ENCODER},
EncodedData,
@ -253,20 +88,7 @@ mod tests {
verifier::DaVerifier,
};
use super::Certificate;
fn generate_keys() -> (PublicKey, SecretKey) {
let mut rng = OsRng;
let sk_bytes: [u8; 32] = rng.gen();
let sk = SecretKey::key_gen(&sk_bytes, &[]).unwrap();
let pk = sk.sk_to_pk();
(pk, sk)
}
fn attest_encoded_data(
encoded_data: &EncodedData,
verifiers: &[DaVerifier],
) -> Vec<Attestation> {
fn attest_encoded_data(encoded_data: &EncodedData, verifiers: &[DaVerifier]) -> Vec<bool> {
let mut attestations = Vec::new();
let domain_size = encoded_data.extended_data.0[0].len();
for (i, column) in encoded_data.extended_data.columns().enumerate() {
@ -283,60 +105,13 @@ mod tests {
.map(|proofs| proofs.get(i).cloned().unwrap())
.collect(),
};
attestations.push(verifier.verify(&da_blob, domain_size).unwrap());
attestations.push(verifier.verify(&da_blob, domain_size));
}
attestations
}
#[test]
fn test_signature_aggregation_and_verification() {
let (pk1, sk1) = generate_keys();
let (pk2, sk2) = generate_keys();
let (pk3, sk3) = generate_keys();
let message = b"Test message";
let sig1 = sk1.sign(message, NOMOS_DA_DST, &[]);
let sig2 = sk2.sign(message, NOMOS_DA_DST, &[]);
let sig3 = sk3.sign(message, NOMOS_DA_DST, &[]);
let aggregated_signature = aggregate_signatures(vec![sig1, sig2, sig3]).unwrap();
let public_keys = vec![&pk1, &pk2, &pk3];
let messages = vec![message.as_ref(), message.as_ref(), message.as_ref()];
let result = verify_aggregate_signature(&aggregated_signature, &public_keys, &messages);
assert!(result, "Aggregated signature should be valid.");
}
#[test]
fn test_invalid_signature_aggregation() {
let (pk1, sk1) = generate_keys();
let (pk2, sk2) = generate_keys();
let (_, sk3) = generate_keys();
let message = b"Test message";
let sig1 = sk1.sign(message, NOMOS_DA_DST, &[]);
let sig2 = sk2.sign(message, NOMOS_DA_DST, &[]);
let sig3 = sk3.sign(message, NOMOS_DA_DST, &[]);
let aggregated_signature = aggregate_signatures(vec![sig1, sig2, sig3]).unwrap();
let (wrong_pk3, _) = generate_keys(); // Generate another key pair for the "wrong" public key
let public_keys = vec![&pk1, &pk2, &wrong_pk3]; // Incorrect public key for sig3 to demonstrate failure.
let messages = vec![message.as_ref(), message.as_ref(), message.as_ref()];
let result = verify_aggregate_signature(&aggregated_signature, &public_keys, &messages);
assert!(
!result,
"Aggregated signature with a mismatched public key should not be valid."
);
}
#[test]
fn test_encoded_data_verification() {
const THRESHOLD: usize = 16;
let encoder = &ENCODER;
let data = rand_data(8);
let mut rng = thread_rng();
@ -360,108 +135,6 @@ mod tests {
let attestations = attest_encoded_data(&encoded_data, &verifiers);
let signers = bitvec![u8, Lsb0; 1; 16];
let cert = Certificate::build_certificate(
&encoded_data,
&attestations,
signers,
THRESHOLD,
Metadata::default(),
)
.unwrap();
let public_keys: Vec<PublicKey> = sks.iter().map(|sk| sk.sk_to_pk()).collect();
assert!(cert.verify(&public_keys));
}
#[test]
fn test_encoded_data_insufficient_verification() {
const THRESHOLD: usize = 16;
let encoder = &ENCODER;
let data = rand_data(8);
let mut rng = thread_rng();
let sks: Vec<SecretKey> = (0..16)
.map(|_| {
let mut buff = [0u8; 32];
rng.fill_bytes(&mut buff);
SecretKey::key_gen(&buff, &[]).unwrap()
})
.collect();
let verifiers: Vec<DaVerifier> = sks
.clone()
.into_iter()
.enumerate()
.map(|(index, sk)| DaVerifier { sk, index })
.collect();
let encoded_data = encoder.encode(&data).unwrap();
let mut attestations = attest_encoded_data(&encoded_data, &verifiers);
// Imitate missing attestation.
attestations.pop();
let signers = bitvec![u8, Lsb0; 1; 16];
let cert_result = Certificate::build_certificate(
&encoded_data,
&attestations,
signers,
THRESHOLD,
Metadata::default(),
);
// Certificate won't be created because of not reaching required threshold.
assert!(cert_result.is_err());
}
#[test]
fn test_encoded_data_wrong_pk_verification() {
const THRESHOLD: usize = 16;
let encoder = &ENCODER;
let data = rand_data(8);
let mut rng = thread_rng();
let sks: Vec<SecretKey> = (0..16)
.map(|_| {
let mut buff = [0u8; 32];
rng.fill_bytes(&mut buff);
SecretKey::key_gen(&buff, &[]).unwrap()
})
.collect();
let verifiers: Vec<DaVerifier> = sks
.clone()
.into_iter()
.enumerate()
.map(|(index, sk)| DaVerifier { sk, index })
.collect();
let encoded_data = encoder.encode(&data).unwrap();
let attestations = attest_encoded_data(&encoded_data, &verifiers);
let signers = bitvec![u8, Lsb0; 1; 16];
let cert = Certificate::build_certificate(
&encoded_data,
&attestations,
signers,
THRESHOLD,
Metadata::default(),
)
.unwrap();
let mut public_keys: Vec<PublicKey> = sks.iter().map(|sk| sk.sk_to_pk()).collect();
// Imitate different set of public keys on the verifier side.
let (wrong_pk, _) = generate_keys();
public_keys.pop();
public_keys.push(wrong_pk);
// Certificate should fail to be verified.
assert!(!cert.verify(&public_keys));
assert!(!attestations.contains(&false));
}
}

View File

@ -208,8 +208,13 @@ impl DaEncoder {
.collect(),
)
}
}
pub fn encode(&self, data: &[u8]) -> Result<EncodedData, KzgRsError> {
impl nomos_core::da::DaEncoder for DaEncoder {
type EncodedData = EncodedData;
type Error = KzgRsError;
fn encode(&self, data: &[u8]) -> Result<EncodedData, KzgRsError> {
let chunked_data = self.chunkify(data);
let row_domain = PolynomialEvaluationDomain::new(self.params.column_count)
.expect("Domain should be able to build");
@ -263,6 +268,7 @@ pub mod test {
decode, verify_element_proof, FieldElement, PolynomialEvaluationDomain,
BYTES_PER_FIELD_ELEMENT,
};
use nomos_core::da::DaEncoder as _;
use rand::RngCore;
use std::ops::Div;
@ -376,7 +382,7 @@ pub mod test {
assert!(verify_element_proof(
i,
&element,
&commitment,
commitment,
&proofs[i],
domain,
&GLOBAL_PARAMETERS
@ -391,7 +397,7 @@ pub mod test {
assert!(verify_element_proof(
i,
&element,
&commitment,
commitment,
&proofs[i],
domain,
&GLOBAL_PARAMETERS

View File

@ -11,11 +11,8 @@ use kzgrs::{
};
use crate::common::blob::DaBlob;
use crate::common::NOMOS_DA_DST;
// internal
use crate::common::{
attestation::Attestation, build_attestation_message, hash_column_and_commitment, Chunk, Column,
};
use crate::common::{hash_column_and_commitment, Chunk, Column};
use crate::encoder::DaEncoderParams;
use crate::global::GLOBAL_PARAMETERS;
@ -118,23 +115,7 @@ impl DaVerifier {
true
}
fn build_attestation(&self, blob: &DaBlob) -> Attestation {
let message =
build_attestation_message(&blob.aggregated_column_commitment, &blob.rows_commitments);
let signature = self.sk.sign(&message, NOMOS_DA_DST, b"");
let blob_id = blob.id();
let blob_hash: [u8; 32] = blob_id
.try_into()
.expect("Blob ID must be exactly 32 bytes long");
Attestation {
signature: signature.to_bytes().to_vec(),
blob_hash,
}
}
pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> Option<Attestation> {
pub fn verify(&self, blob: &DaBlob, rows_domain_size: usize) -> bool {
let rows_domain = PolynomialEvaluationDomain::new(rows_domain_size)
.expect("Domain should be able to build");
let is_column_verified = DaVerifier::verify_column(
@ -146,7 +127,7 @@ impl DaVerifier {
rows_domain,
);
if !is_column_verified {
return None;
return false;
}
let are_chunks_verified = DaVerifier::verify_chunks(
@ -157,9 +138,9 @@ impl DaVerifier {
rows_domain,
);
if !are_chunks_verified {
return None;
return false;
}
Some(self.build_attestation(blob))
true
}
}
@ -176,6 +157,7 @@ mod test {
use kzgrs::{
bytes_to_polynomial, commit_polynomial, generate_element_proof, BYTES_PER_FIELD_ELEMENT,
};
use nomos_core::da::DaEncoder as _;
use rand::{thread_rng, RngCore};
#[test]
@ -251,7 +233,7 @@ mod test {
.map(|proofs| proofs.get(i).cloned().unwrap())
.collect(),
};
assert!(verifier.verify(&da_blob, domain_size).is_some());
assert!(verifier.verify(&da_blob, domain_size));
}
}
}

View File

@ -9,32 +9,25 @@ use cryptarchia_consensus::{
network::adapters::libp2p::LibP2pAdapter as ConsensusNetworkAdapter, ConsensusMsg,
CryptarchiaConsensus, CryptarchiaInfo,
};
use kzgrs_backend::dispersal::{Certificate, VidCertificate};
use kzgrs_backend::dispersal::BlobInfo;
use nomos_core::{
da::certificate::{self, select::FillSize as FillSizeWithBlobsCertificate},
da::blob::{self, select::FillSize as FillSizeWithBlobs},
header::HeaderId,
tx::{select::FillSize as FillSizeWithTx, Transaction},
};
use nomos_mempool::{
backend::mockpool::MockPool,
da::verify::kzgrs::DaVerificationProvider as MempoolVerificationProvider,
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
};
use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde};
pub type Cryptarchia<Tx, SS, const SIZE: usize> = CryptarchiaConsensus<
ConsensusNetworkAdapter<Tx, Certificate>,
ConsensusNetworkAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolVerificationProvider,
MockPool<HeaderId, BlobInfo, <BlobInfo as blob::info::DispersedBlobInfo>::BlobId>,
MempoolNetworkAdapter<BlobInfo, <BlobInfo as blob::info::DispersedBlobInfo>::BlobId>,
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobsCertificate<SIZE, VidCertificate>,
FillSizeWithBlobs<SIZE, BlobInfo>,
RocksBackend<SS>,
>;

View File

@ -1,9 +1,7 @@
use bytes::Bytes;
use core::ops::Range;
use nomos_core::da::attestation::Attestation;
use nomos_core::da::blob::Blob;
use nomos_core::da::certificate::metadata::Metadata;
use nomos_core::da::certificate::{self, select::FillSize as FillSizeWithBlobsCertificate};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::{metadata::Metadata, select::FillSize as FillSizeWithBlobs, Blob};
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::header::HeaderId;
use nomos_core::tx::{select::FillSize as FillSizeWithTx, Transaction};
@ -18,7 +16,6 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStora
use nomos_da_verifier::{DaVerifierMsg, DaVerifierService};
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::verify::MempoolVerificationProvider;
use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
@ -30,7 +27,7 @@ use std::fmt::Debug;
use std::hash::Hash;
use tokio::sync::oneshot;
pub type DaIndexer<Tx, C, V, VP, SS, const SIZE: usize> = DataIndexerService<
pub type DaIndexer<Tx, C, V, SS, const SIZE: usize> = DataIndexerService<
// Indexer specific.
Bytes,
IndexerStorageAdapter<SS, V>,
@ -40,10 +37,9 @@ pub type DaIndexer<Tx, C, V, VP, SS, const SIZE: usize> = DataIndexerService<
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, V, [u8; 32]>,
MempoolNetworkAdapter<C, <C as certificate::Certificate>::Id>,
VP,
MempoolNetworkAdapter<C, <C as DispersedBlobInfo>::BlobId>,
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobsCertificate<SIZE, V>,
FillSizeWithBlobs<SIZE, V>,
RocksBackend<SS>,
>;
@ -53,12 +49,12 @@ pub type DaVerifier<A, B, VB, SS> =
pub async fn add_blob<A, B, VB, SS>(
handle: &OverwatchHandle,
blob: B,
) -> Result<Option<A>, DynError>
) -> Result<Option<()>, DynError>
where
A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B, Attestation = A>,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B>,
<VB as VerifierBackend>::Settings: Clone,
<VB as CoreDaVerifier>::Error: Error,
SS: StorageSerde + Send + Sync + 'static,
@ -76,7 +72,7 @@ where
Ok(receiver.await?)
}
pub async fn get_range<Tx, C, V, VP, SS, const SIZE: usize>(
pub async fn get_range<Tx, C, V, SS, const SIZE: usize>(
handle: &OverwatchHandle,
app_id: <V as Metadata>::AppId,
range: Range<<V as Metadata>::Index>,
@ -93,7 +89,7 @@ where
+ Sync
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
C: certificate::Certificate<Id = [u8; 32]>
C: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
+ Serialize
@ -101,8 +97,8 @@ where
+ Send
+ Sync
+ 'static,
<C as certificate::Certificate>::Id: Clone + Send + Sync,
V: certificate::vid::VidCertificate<CertificateId = [u8; 32]>
<C as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
V: DispersedBlobInfo<BlobId = [u8; 32]>
+ From<C>
+ Eq
+ Debug
@ -114,18 +110,14 @@ where
+ Send
+ Sync
+ 'static,
<V as certificate::vid::VidCertificate>::CertificateId: Debug + Clone + Ord + Hash,
<V as DispersedBlobInfo>::BlobId: Debug + Clone + Ord + Hash,
<V as Metadata>::AppId: AsRef<[u8]> + Serialize + Clone + Send + Sync,
<V as Metadata>::Index:
AsRef<[u8]> + Serialize + DeserializeOwned + Clone + PartialOrd + Send + Sync,
VP: MempoolVerificationProvider<
Payload = C,
Parameters = <C as certificate::Certificate>::VerificationParameters,
>,
SS: StorageSerde + Send + Sync + 'static,
{
let relay = handle
.relay::<DaIndexer<Tx, C, V, VP, SS, SIZE>>()
.relay::<DaIndexer<Tx, C, V, SS, SIZE>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -1,8 +1,8 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::{da::certificate::Certificate, header::HeaderId};
use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId};
use nomos_mempool::{
backend::mockpool::MockPool, network::NetworkAdapter, verify::MempoolVerificationProvider,
DaMempoolService, MempoolMsg, TxMempoolService,
backend::mockpool::MockPool, network::NetworkAdapter, DaMempoolService, MempoolMsg,
TxMempoolService,
};
use nomos_network::backends::NetworkBackend;
use tokio::sync::oneshot;
@ -49,17 +49,13 @@ pub async fn add_cert<N, A, V, Item, Key>(
where
N: NetworkBackend,
A: NetworkAdapter<Backend = N, Key = Key> + Send + Sync + 'static,
A::Payload: Certificate + Into<Item> + Debug,
A::Payload: DispersedBlobInfo + Into<Item> + Debug,
A::Settings: Send + Sync,
V: MempoolVerificationProvider<
Payload = A::Payload,
Parameters = <A::Payload as Certificate>::VerificationParameters,
>,
Item: Clone + Debug + Send + Sync + 'static + Hash,
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<DaMempoolService<A, MockPool<HeaderId, Item, Key>, V>>()
.relay::<DaMempoolService<A, MockPool<HeaderId, Item, Key>>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -8,7 +8,7 @@ use nomos_storage::{
pub async fn block_req<S, Tx>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
id: HeaderId,
) -> Result<Option<Block<Tx, full_replication::Certificate>>, super::DynError>
) -> Result<Option<Block<Tx, full_replication::BlobInfo>>, super::DynError>
where
Tx: serde::Serialize + serde::de::DeserializeOwned + Clone + Eq + core::hash::Hash,
S: StorageSerde + Send + Sync + 'static,

View File

@ -7,9 +7,8 @@ use cryptarchia_engine::Slot;
use cryptarchia_ledger::{Coin, LeaderProof, LedgerState};
use futures::StreamExt;
use network::{messages::NetworkMessage, NetworkAdapter};
use nomos_core::da::certificate::{
metadata::Metadata as CertificateMetadata, vid::VidCertificate, BlobCertificateSelect,
Certificate,
use nomos_core::da::blob::{
info::DispersedBlobInfo, metadata::Metadata as BlobMetadata, BlobSelect,
};
use nomos_core::header::{cryptarchia::Header, HeaderId};
use nomos_core::tx::{Transaction, TxSelect};
@ -17,7 +16,6 @@ use nomos_core::{
block::{builder::BlockBuilder, Block},
header::cryptarchia::Builder,
};
use nomos_mempool::verify::MempoolVerificationProvider;
use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, DaMempoolService, MempoolMsg,
TxMempoolService,
@ -136,34 +134,21 @@ impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> {
}
}
pub struct CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
Storage,
> where
pub struct CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
where
A: NetworkAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
ClPool::Item: Clone + Eq + Hash + Debug + 'static,
ClPool::Key: Debug + 'static,
DaPool::Item: Clone + Eq + Hash + Debug + 'static,
DaPool::Key: Debug + 'static,
A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
BS: BlobSelect<BlobId = DaPool::Item>,
Storage: StorageBackend + Send + Sync + 'static,
{
service_state: ServiceStateHandle<Self>,
@ -171,24 +156,13 @@ pub struct CryptarchiaConsensus<
// when implementing ServiceCore for CryptarchiaConsensus
network_relay: Relay<NetworkService<A::Backend>>,
cl_mempool_relay: Relay<TxMempoolService<ClPoolAdapter, ClPool>>,
da_mempool_relay: Relay<DaMempoolService<DaPoolAdapter, DaPool, DaVerificationProvider>>,
da_mempool_relay: Relay<DaMempoolService<DaPoolAdapter, DaPool>>,
block_subscription_sender: broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
storage_relay: Relay<StorageService<Storage>>,
}
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, DaVerificationProvider, TxS, BS, Storage>
ServiceData
for CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
Storage,
>
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceData
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
where
A: NetworkAdapter,
ClPool: MemPool<BlockId = HeaderId>,
@ -199,13 +173,9 @@ where
DaPool::Key: Debug,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
BS: BlobSelect<BlobId = DaPool::Item>,
Storage: StorageBackend + Send + Sync + 'static,
{
const SERVICE_ID: ServiceId = CRYPTARCHIA_ID;
@ -216,19 +186,8 @@ where
}
#[async_trait::async_trait]
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, DaVerificationProvider, TxS, BS, Storage>
ServiceCore
for CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
Storage,
>
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceCore
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
where
A: NetworkAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
+ Clone
@ -250,8 +209,8 @@ where
+ Sync
+ 'static,
// TODO: Change to specific certificate bounds here
DaPool::Item: VidCertificate<CertificateId = DaPool::Key>
+ CertificateMetadata
DaPool::Item: DispersedBlobInfo<BlobId = DaPool::Key>
+ BlobMetadata
+ Debug
+ Clone
+ Eq
@ -266,16 +225,10 @@ where
ClPoolAdapter:
MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key> + Send + Sync + 'static,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
> + Send
+ Sync
+ 'static,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
TxS::Settings: Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
BS: BlobSelect<BlobId = DaPool::Item> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static,
Storage: StorageBackend + Send + Sync + 'static,
{
@ -414,18 +367,8 @@ where
}
}
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, DaVerificationProvider, TxS, BS, Storage>
CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
Storage,
>
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
@ -442,8 +385,8 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: VidCertificate<CertificateId = DaPool::Key>
+ CertificateMetadata
DaPool::Item: DispersedBlobInfo<BlobId = DaPool::Key>
+ BlobMetadata
+ Debug
+ Clone
+ Eq
@ -454,19 +397,13 @@ where
+ Sync
+ 'static,
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
BS: BlobSelect<BlobId = DaPool::Item> + Clone + Send + Sync + 'static,
ClPool::Key: Debug + Send + Sync,
DaPool::Key: Debug + Send + Sync,
ClPoolAdapter:
MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key> + Send + Sync + 'static,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
> + Send
+ Sync
+ 'static,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
Storage: StorageBackend + Send + Sync + 'static,
{
async fn should_stop_service(message: LifecycleMessage) -> bool {
@ -578,7 +515,7 @@ where
mark_in_block(
da_mempool_relay,
block.blobs().map(VidCertificate::certificate_id),
block.blobs().map(DispersedBlobInfo::blob_id),
id,
)
.await;

View File

@ -10,12 +10,9 @@ use cryptarchia_consensus::network::NetworkAdapter;
use cryptarchia_consensus::CryptarchiaConsensus;
use futures::StreamExt;
use nomos_core::block::Block;
use nomos_core::da::certificate::metadata::Metadata;
use nomos_core::da::certificate::vid::VidCertificate;
use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
use nomos_core::da::blob::{info::DispersedBlobInfo, metadata::Metadata, BlobSelect};
use nomos_core::header::HeaderId;
use nomos_core::tx::{Transaction, TxSelect};
use nomos_mempool::verify::MempoolVerificationProvider;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter};
use nomos_storage::backends::StorageBackend;
use nomos_storage::StorageService;
@ -31,29 +28,8 @@ use storage::DaStorageAdapter;
use tokio::sync::oneshot::Sender;
use tracing::error;
pub type ConsensusRelay<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
Storage,
> = Relay<
CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
Storage,
>,
>;
pub type ConsensusRelay<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> =
Relay<CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>>;
pub struct DataIndexerService<
B,
@ -64,7 +40,6 @@ pub struct DataIndexerService<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -75,39 +50,26 @@ pub struct DataIndexerService<
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
ClPool::Item: Clone + Eq + Hash + Debug + 'static,
ClPool::Key: Debug + 'static,
DaPool::Item: Metadata + Clone + Eq + Hash + Debug + 'static,
DaPool::Key: Debug + 'static,
A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
DaStorage: DaStorageAdapter<VID = DaPool::Item, Blob = B>,
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
{
service_state: ServiceStateHandle<Self>,
storage_relay: Relay<StorageService<DaStorage::Backend>>,
consensus_relay: ConsensusRelay<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
>,
consensus_relay:
ConsensusRelay<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, ConsensusStorage>,
}
pub enum DaMsg<B, V: Metadata> {
AddIndex {
vid: V,
info: V,
},
GetRange {
app_id: <V as Metadata>::AppId,
@ -140,7 +102,6 @@ impl<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -154,7 +115,6 @@ impl<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -166,19 +126,15 @@ where
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
ClPool::Item: Clone + Eq + Hash + Debug + 'static,
ClPool::Key: Debug + 'static,
DaPool::Item: Metadata + Clone + Eq + Hash + Debug + 'static,
DaPool::Key: Debug + 'static,
A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
DaStorage: DaStorageAdapter<VID = DaPool::Item, Blob = B>,
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
{
const SERVICE_ID: ServiceId = "DaIndexer";
@ -197,7 +153,6 @@ impl<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -211,7 +166,6 @@ impl<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -223,11 +177,7 @@ where
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
ClPool::Item: Clone + Eq + Hash + Debug + 'static,
ClPool::Key: Debug + 'static,
DaPool::Item: Metadata + Clone + Eq + Hash + Debug + 'static,
@ -235,16 +185,16 @@ where
<DaPool::Item as Metadata>::Index: Send + Sync,
A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
DaStorage: DaStorageAdapter<VID = DaPool::Item, Blob = B>,
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
{
async fn handle_new_block(
storage_adapter: &DaStorage,
block: Block<ClPool::Item, DaPool::Item>,
) -> Result<(), DynError> {
for vid in block.blobs() {
storage_adapter.add_index(vid).await?;
for info in block.blobs() {
storage_adapter.add_index(info).await?;
}
Ok(())
}
@ -254,7 +204,7 @@ where
msg: DaMsg<B, DaPool::Item>,
) -> Result<(), DynError> {
match msg {
DaMsg::AddIndex { vid } => storage_adapter.add_index(&vid).await,
DaMsg::AddIndex { info } => storage_adapter.add_index(&info).await,
DaMsg::GetRange {
app_id,
range,
@ -296,7 +246,6 @@ impl<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -310,7 +259,6 @@ impl<
ClPoolAdapter,
DaPool,
DaPoolAdapter,
DaVerificationProvider,
TxS,
BS,
ConsensusStorage,
@ -322,11 +270,7 @@ where
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: Certificate + Into<DaPool::Item> + Debug,
DaVerificationProvider: MempoolVerificationProvider<
Payload = DaPoolAdapter::Payload,
Parameters = <DaPoolAdapter::Payload as Certificate>::VerificationParameters,
>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
ClPool::Key: Debug + 'static,
DaPool::Key: Debug + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
@ -339,7 +283,7 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: VidCertificate<CertificateId = DaPool::Key>
DaPool::Item: DispersedBlobInfo<BlobId = DaPool::Key>
+ Metadata
+ Debug
+ Clone
@ -355,8 +299,8 @@ where
A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
DaStorage: DaStorageAdapter<VID = DaPool::Item, Blob = B> + Send + Sync + 'static,
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B> + Send + Sync + 'static,
DaStorage::Settings: Clone + Send + Sync + 'static,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
Consensus: ConsensusAdapter<Tx = ClPool::Item, Cert = DaPool::Item> + Send + Sync,

View File

@ -3,9 +3,9 @@ use std::{marker::PhantomData, ops::Range};
use bytes::Bytes;
use futures::{stream::FuturesUnordered, Stream};
use nomos_core::da::certificate::{
use nomos_core::da::blob::{
info::DispersedBlobInfo,
metadata::{Metadata, Next},
vid::VidCertificate,
};
use nomos_da_storage::fs::load_blob;
use nomos_da_storage::rocksdb::{key_bytes, DA_ATTESTED_KEY_PREFIX, DA_VID_KEY_PREFIX};
@ -20,27 +20,27 @@ use overwatch_rs::{
use crate::storage::DaStorageAdapter;
pub struct RocksAdapter<S, V>
pub struct RocksAdapter<S, B>
where
S: StorageSerde + Send + Sync + 'static,
V: VidCertificate + Metadata + Send + Sync,
B: DispersedBlobInfo + Metadata + Send + Sync,
{
settings: RocksAdapterSettings,
storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
_vid: PhantomData<V>,
_vid: PhantomData<B>,
}
#[async_trait::async_trait]
impl<S, V> DaStorageAdapter for RocksAdapter<S, V>
impl<S, B> DaStorageAdapter for RocksAdapter<S, B>
where
S: StorageSerde + Send + Sync + 'static,
V: VidCertificate<CertificateId = [u8; 32]> + Metadata + Send + Sync,
V::Index: AsRef<[u8]> + Next + Clone + PartialOrd + Send + Sync + 'static,
V::AppId: AsRef<[u8]> + Clone + Send + Sync + 'static,
B: DispersedBlobInfo<BlobId = [u8; 32]> + Metadata + Send + Sync,
B::Index: AsRef<[u8]> + Next + Clone + PartialOrd + Send + Sync + 'static,
B::AppId: AsRef<[u8]> + Clone + Send + Sync + 'static,
{
type Backend = RocksBackend<S>;
type Blob = Bytes;
type VID = V;
type Info = B;
type Settings = RocksAdapterSettings;
async fn new(
@ -54,11 +54,11 @@ where
}
}
async fn add_index(&self, vid: &Self::VID) -> Result<(), DynError> {
let (app_id, idx) = vid.metadata();
async fn add_index(&self, info: &Self::Info) -> Result<(), DynError> {
let (app_id, idx) = info.metadata();
// Check if VID in a block is something that the node've seen before.
let attested_key = key_bytes(DA_ATTESTED_KEY_PREFIX, vid.certificate_id());
// Check if Info in a block is something that the node've seen before.
let attested_key = key_bytes(DA_ATTESTED_KEY_PREFIX, info.blob_id());
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay
@ -69,7 +69,7 @@ where
.await
.expect("Failed to send load request to storage relay");
// If node haven't attested this vid, return early.
// If node haven't attested this info, return early.
if reply_rx.await?.is_none() {
return Ok(());
}
@ -79,8 +79,8 @@ where
[app_id.clone().as_ref(), idx.as_ref()].concat(),
);
// We are only persisting the id part of VID, the metadata can be derived from the key.
let value = Bytes::from(vid.certificate_id().to_vec());
// We are only persisting the id part of Info, the metadata can be derived from the key.
let value = Bytes::from(info.blob_id().to_vec());
self.storage_relay
.send(StorageMsg::Store {
@ -93,9 +93,9 @@ where
async fn get_range_stream(
&self,
app_id: <Self::VID as Metadata>::AppId,
index_range: Range<<Self::VID as Metadata>::Index>,
) -> Box<dyn Stream<Item = (<Self::VID as Metadata>::Index, Option<Bytes>)> + Unpin + Send>
app_id: <Self::Info as Metadata>::AppId,
index_range: Range<<Self::Info as Metadata>::Index>,
) -> Box<dyn Stream<Item = (<Self::Info as Metadata>::Index, Option<Bytes>)> + Unpin + Send>
{
let futures = FuturesUnordered::new();

View File

@ -3,7 +3,7 @@ pub mod adapters;
use std::ops::Range;
use futures::Stream;
use nomos_core::da::certificate::{metadata::Metadata, vid::VidCertificate};
use nomos_core::da::blob::{info::DispersedBlobInfo, metadata::Metadata};
use nomos_storage::{backends::StorageBackend, StorageService};
use overwatch_rs::{
services::{relay::OutboundRelay, ServiceData},
@ -16,17 +16,17 @@ pub trait DaStorageAdapter {
type Settings: Clone;
type Blob;
type VID: VidCertificate;
type Info: DispersedBlobInfo;
async fn new(
config: Self::Settings,
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn add_index(&self, vid: &Self::VID) -> Result<(), DynError>;
async fn add_index(&self, vid: &Self::Info) -> Result<(), DynError>;
async fn get_range_stream(
&self,
app_id: <Self::VID as Metadata>::AppId,
range: Range<<Self::VID as Metadata>::Index>,
) -> Box<dyn Stream<Item = (<Self::VID as Metadata>::Index, Option<Self::Blob>)> + Unpin + Send>;
app_id: <Self::Info as Metadata>::AppId,
range: Range<<Self::Info as Metadata>::Index>,
) -> Box<dyn Stream<Item = (<Self::Info as Metadata>::Index, Option<Self::Blob>)> + Unpin + Send>;
}

View File

@ -1,8 +1,7 @@
use bytes::Bytes;
use full_replication::{Certificate, VidCertificate};
use kzgrs_backend::common::attestation::Attestation;
use full_replication::BlobInfo;
use kzgrs_backend::common::blob::DaBlob;
use nomos_core::{da::certificate, header::HeaderId, tx::Transaction};
use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transaction};
use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter;
use nomos_da_indexer::DataIndexerService;
@ -11,52 +10,40 @@ use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
use nomos_da_verifier::DaVerifierService;
use nomos_libp2p::{Multiaddr, Swarm, SwarmConfig};
use nomos_mempool::da::verify::fullreplication::DaVerificationProvider as MempoolVerificationProvider;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
use nomos_storage::backends::rocksdb::RocksBackend;
pub use nomos_core::{
da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
tx::select::FillSize as FillSizeWithTx,
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
};
use nomos_mempool::da::service::DaMempoolService;
use nomos_node::{Tx, Wire};
pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, VidCertificate>,
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolVerificationProvider,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, VidCertificate>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
>;
pub(crate) type DaIndexer = DataIndexerService<
// Indexer specific.
Bytes,
IndexerStorageAdapter<Wire, full_replication::VidCertificate>,
CryptarchiaConsensusAdapter<Tx, full_replication::VidCertificate>,
IndexerStorageAdapter<Wire, full_replication::BlobInfo>,
CryptarchiaConsensusAdapter<Tx, full_replication::BlobInfo>,
// Cryptarchia specific, should be the same as in `Cryptarchia` type above.
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, VidCertificate>,
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolVerificationProvider,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, VidCertificate>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
>;
@ -66,19 +53,14 @@ pub(crate) type TxMempool = TxMempoolService<
>;
pub(crate) type DaMempool = DaMempoolService<
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolVerificationProvider,
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
>;
pub(crate) type DaVerifier = DaVerifierService<
KzgrsDaVerifier,
Libp2pAdapter<DaBlob, Attestation>,
VerifierStorageAdapter<Attestation, DaBlob, Wire>,
Libp2pAdapter<DaBlob, ()>,
VerifierStorageAdapter<(), DaBlob, Wire>,
>;
pub(crate) const MB16: usize = 1024 * 1024 * 16;

View File

@ -6,23 +6,14 @@ use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::Duration;
// crates
use blake2::{
digest::{Update, VariableOutput},
Blake2bVar,
};
use bytes::Bytes;
use cryptarchia_consensus::ConsensusMsg;
use cryptarchia_consensus::TimeConfig;
use cryptarchia_ledger::{Coin, LedgerState};
use full_replication::attestation::Attestation;
use full_replication::{Certificate, VidCertificate};
use nomos_core::da::attestation::Attestation as TraitAttestation;
use nomos_core::da::certificate::metadata::Metadata;
use nomos_core::da::certificate::vid::VidCertificate as _;
use nomos_core::da::certificate::Certificate as _;
use nomos_core::da::certificate::CertificateStrategy;
use nomos_core::da::Signer;
use nomos_core::{da::certificate, tx::Transaction};
use full_replication::{BlobInfo, Metadata};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata as _;
use nomos_core::tx::Transaction;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_storage::fs::write_blob;
@ -83,10 +74,7 @@ fn new_node(
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: <Certificate as certificate::Certificate>::id,
},
verification_provider: full_replication::CertificateVerificationParameters {
threshold: 0,
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: None,
},
@ -115,24 +103,6 @@ fn new_node(
.unwrap()
}
fn hash(item: impl AsRef<[u8]>) -> [u8; 32] {
let mut hasher = Blake2bVar::new(32).unwrap();
hasher.update(item.as_ref());
let mut output = [0; 32];
hasher.finalize_variable(&mut output).unwrap();
output
}
fn signature(attestation: &Attestation) -> [u8; 32] {
let mut attestations = vec![attestation];
attestations.sort();
let mut signatures = Vec::new();
for attestation in &attestations {
signatures.extend_from_slice(attestation.signature());
}
hash(signatures)
}
// TODO: When verifier is implemented this test should be removed and a new one
// performed in integration tests crate using the real node.
@ -212,43 +182,26 @@ fn test_indexer() {
let app_id = [7u8; 32];
let index = 0.into();
let attestation = Attestation::new_signed(blob_hash, ids[0], &MockKeyPair);
let certificate_strategy = full_replication::AbsoluteNumber::new(1);
let cert = certificate_strategy.build(vec![attestation.clone()], app_id, index);
let cert_id = cert.id();
let vid: VidCertificate = cert.clone().into();
let range = 0.into()..1.into(); // get idx 0 and 1.
// Test generate hash for Attestation
let hash2 = attestation.hash();
let expected_hash = hash([blob_hash, ids[0]].concat());
assert_eq!(hash2, expected_hash);
// Test generate signature for Certificate
let sig2 = cert.signature();
let expected_signature = signature(&attestation);
assert_eq!(sig2, expected_signature);
let meta = Metadata::new(app_id, index);
let blob_info = BlobInfo::new(blob_hash, meta);
// Test get Metadata for Certificate
let (app_id2, index2) = cert.metadata();
let (app_id2, index2) = blob_info.metadata();
assert_eq!(app_id2, app_id);
assert_eq!(index2, index);
// Test generate hash for Certificate with default Hasher
let mut default_hasher = DefaultHasher::new();
let _hash3 = <Certificate as Hash>::hash(&cert, &mut default_hasher);
let _hash3 = <BlobInfo as Hash>::hash(&blob_info, &mut default_hasher);
let expected_blob_info = blob_info.clone();
// Mock attestation step where blob is persisted in nodes blob storage.
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(write_blob(
blobs_dir,
vid.certificate_id().as_ref(),
b"blob",
))
.unwrap();
rt.block_on(write_blob(blobs_dir, blob_info.blob_id().as_ref(), b"blob"))
.unwrap();
node1.spawn(async move {
let mempool_outbound = mempool.connect().await.unwrap();
@ -280,12 +233,12 @@ fn test_indexer() {
.await
.unwrap();
// Put cert into the mempool.
// Put blob_info into the mempool.
let (mempool_tx, mempool_rx) = tokio::sync::oneshot::channel();
mempool_outbound
.send(nomos_mempool::MempoolMsg::Add {
payload: cert,
key: cert_id,
payload: blob_info,
key: blob_hash,
reply_channel: mempool_tx,
})
.await
@ -299,7 +252,7 @@ fn test_indexer() {
loop {
tokio::select! {
Some(block) = broadcast_receiver.next() => {
if block.blobs().any(|v| *v == vid) {
if block.blobs().any(|b| *b == expected_blob_info) {
break;
}
}
@ -312,7 +265,7 @@ fn test_indexer() {
// Give time for services to process and store data.
tokio::time::sleep(Duration::from_secs(1)).await;
// Request range of vids from indexer.
// Request range of blobs from indexer.
let (indexer_tx, indexer_rx) = tokio::sync::oneshot::channel();
indexer_outbound
.send(nomos_da_indexer::DaMsg::GetRange {
@ -324,7 +277,7 @@ fn test_indexer() {
.unwrap();
let mut app_id_blobs = indexer_rx.await.unwrap();
// Since we've only attested to certificate at idx 0, the first
// Since we've only attested to blob_info at idx 0, the first
// item should have "some" data, other indexes should be None.
app_id_blobs.sort_by(|(a, _), (b, _)| a.partial_cmp(b).unwrap());
let app_id_blobs = app_id_blobs.iter().map(|(_, b)| b).collect::<Vec<_>>();
@ -342,11 +295,3 @@ fn test_indexer() {
}
assert!(is_success_rx.load(SeqCst));
}
struct MockKeyPair;
impl Signer for MockKeyPair {
fn sign(&self, _message: &[u8]) -> Vec<u8> {
vec![]
}
}

View File

@ -1,15 +1,17 @@
// std
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::time::Duration;
// crates
use cryptarchia_consensus::TimeConfig;
use cryptarchia_ledger::{Coin, LedgerState};
use full_replication::Certificate;
use full_replication::BlobInfo;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::encoder::{DaEncoder, DaEncoderParams};
use nomos_core::{da::certificate, tx::Transaction};
use nomos_core::da::{blob::info::DispersedBlobInfo, DaEncoder as _};
use nomos_core::tx::Transaction;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
@ -28,7 +30,7 @@ use overwatch_rs::services::handle::ServiceHandle;
use rand::{thread_rng, Rng, RngCore};
use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
// internal
use crate::common::*;
// Client node is only created for asyncroniously interact with nodes in the test.
@ -97,10 +99,7 @@ fn new_node(
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: <Certificate as certificate::Certificate>::id,
},
verification_provider: full_replication::CertificateVerificationParameters {
threshold: 0,
id: <BlobInfo as DispersedBlobInfo>::blob_id,
},
registry: None,
},
@ -282,7 +281,7 @@ fn test_verifier() {
.unwrap();
}
// Create cert
// Wait for response from the verifier.
let a1 = node1_reply_rx.await.unwrap();
let a2 = node2_reply_rx.await.unwrap();
@ -290,7 +289,6 @@ fn test_verifier() {
is_success_tx.store(true, SeqCst);
}
// TODO: Create cert and check indexer integration.
performed_tx.store(true, SeqCst);
});

View File

@ -2,10 +2,7 @@
use core::fmt;
// crates
use blst::{min_sig::PublicKey, min_sig::SecretKey};
use kzgrs_backend::{
common::{attestation::Attestation, blob::DaBlob},
verifier::DaVerifier as NomosKzgrsVerifier,
};
use kzgrs_backend::{common::blob::DaBlob, verifier::DaVerifier as NomosKzgrsVerifier};
use nomos_core::da::DaVerifier;
// internal
use super::VerifierBackend;
@ -40,18 +37,17 @@ impl VerifierBackend for KzgrsDaVerifier {
impl DaVerifier for KzgrsDaVerifier {
type DaBlob = DaBlob;
type Attestation = Attestation;
type Error = KzgrsDaVerifierError;
fn verify(&self, blob: &Self::DaBlob) -> Result<Self::Attestation, Self::Error> {
fn verify(&self, blob: &Self::DaBlob) -> Result<(), Self::Error> {
let blob = blob.clone();
// TODO: Prepare the domain depending the size, if fixed, so fixed domain, if not it needs
// to come with some metadata.
let domain_size = 2usize;
match self.verifier.verify(&blob, domain_size) {
Some(attestation) => Ok(attestation),
None => Err(KzgrsDaVerifierError::VerificationError),
}
self.verifier
.verify(&blob, domain_size)
.then_some(())
.ok_or(KzgrsDaVerifierError::VerificationError)
}
}

View File

@ -47,7 +47,6 @@ where
Backend: VerifierBackend,
Backend::Settings: Clone,
Backend::DaBlob: 'static,
Backend::Attestation: 'static,
Backend::Error: Error,
N: NetworkAdapter,
N::Settings: Clone,
@ -63,26 +62,23 @@ impl<Backend, N, S> DaVerifierService<Backend, N, S>
where
Backend: VerifierBackend + Send + 'static,
Backend::DaBlob: Debug + Send,
Backend::Attestation: Debug + Send,
Backend::Error: Error + Send + Sync,
Backend::Settings: Clone,
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation> + Send + 'static,
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + 'static,
N::Settings: Clone,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation>
+ Send
+ 'static,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + 'static,
{
async fn handle_new_blob(
verifier: &Backend,
storage_adapter: &S,
blob: &Backend::DaBlob,
) -> Result<Backend::Attestation, DynError> {
if let Some(attestation) = storage_adapter.get_attestation(blob).await? {
Ok(attestation)
) -> Result<(), DynError> {
if storage_adapter.get_attestation(blob).await?.is_some() {
Ok(())
} else {
let attestation = verifier.verify(blob)?;
storage_adapter.add_blob(blob, &attestation).await?;
Ok(attestation)
verifier.verify(blob)?;
storage_adapter.add_blob(blob, &()).await?;
Ok(())
}
}
@ -116,7 +112,7 @@ where
type Settings = DaVerifierServiceSettings<Backend::Settings, N::Settings, S::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = DaVerifierMsg<Backend::DaBlob, Backend::Attestation>;
type Message = DaVerifierMsg<Backend::DaBlob, ()>;
}
#[async_trait::async_trait]
@ -125,17 +121,10 @@ where
Backend: VerifierBackend + Send + Sync + 'static,
Backend::Settings: Clone + Send + Sync + 'static,
Backend::DaBlob: Debug + Send + Sync + 'static,
Backend::Attestation: Debug + Send + Sync + 'static,
Backend::Error: Error + Send + Sync + 'static,
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation>
+ Send
+ Sync
+ 'static,
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation>
+ Send
+ Sync
+ 'static,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + Sync + 'static,
S::Settings: Clone + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {

View File

@ -2,7 +2,7 @@
use serde::{de::DeserializeOwned, Serialize};
use std::{marker::PhantomData, path::PathBuf};
// crates
use nomos_core::da::{attestation::Attestation, blob::Blob};
use nomos_core::da::blob::Blob;
use nomos_da_storage::{
fs::write_blob,
rocksdb::{key_bytes, DA_ATTESTED_KEY_PREFIX},
@ -31,7 +31,7 @@ where
#[async_trait::async_trait]
impl<A, B, S> DaStorageAdapter for RocksAdapter<A, B, S>
where
A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync,
A: Serialize + DeserializeOwned + Clone + Send + Sync,
B: Blob + Serialize + Clone + Send + Sync + 'static,
B::BlobId: AsRef<[u8]> + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,

View File

@ -1,3 +1,2 @@
mod network;
pub mod service;
pub mod verify;

View File

@ -12,12 +12,11 @@ use std::fmt::Debug;
// #[cfg(feature = "metrics")]
// use super::metrics::Metrics;
use futures::StreamExt;
use nomos_core::da::certificate::Certificate;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_metrics::NomosRegistry;
// internal
use crate::backend::MemPool;
use crate::network::NetworkAdapter;
use crate::verify::MempoolVerificationProvider;
use crate::{MempoolMetrics, MempoolMsg};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
@ -29,45 +28,36 @@ use overwatch_rs::services::{
};
use tracing::error;
pub struct DaMempoolService<N, P, V>
pub struct DaMempoolService<N, P>
where
N: NetworkAdapter<Key = P::Key>,
N::Payload: Certificate + Into<P::Item> + Debug + 'static,
N::Payload: DispersedBlobInfo + Into<P::Item> + Debug + 'static,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
V: MempoolVerificationProvider<
Payload = N::Payload,
Parameters = <N::Payload as Certificate>::VerificationParameters,
>,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkService<N::Backend>>,
pool: P,
verification_provider: V,
// TODO: Add again after metrics refactor
// #[cfg(feature = "metrics")]
// metrics: Option<Metrics>,
}
impl<N, P, V> ServiceData for DaMempoolService<N, P, V>
impl<N, P> ServiceData for DaMempoolService<N, P>
where
N: NetworkAdapter<Key = P::Key>,
N::Payload: Certificate + Debug + Into<P::Item> + 'static,
N::Payload: DispersedBlobInfo + Debug + Into<P::Item> + 'static,
P: MemPool,
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
V: MempoolVerificationProvider<
Payload = N::Payload,
Parameters = <N::Payload as Certificate>::VerificationParameters,
>,
{
const SERVICE_ID: ServiceId = "mempool-da";
type Settings = DaMempoolSettings<P::Settings, N::Settings, V::Settings>;
type Settings = DaMempoolSettings<P::Settings, N::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<
@ -79,23 +69,16 @@ where
}
#[async_trait::async_trait]
impl<N, P, V> ServiceCore for DaMempoolService<N, P, V>
impl<N, P> ServiceCore for DaMempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
N::Settings: Clone + Send + Sync + 'static,
V::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Send + Debug + 'static,
N::Payload: Certificate + Into<P::Item> + Clone + Debug + Send + 'static,
N::Payload: DispersedBlobInfo + Into<P::Item> + Clone + Debug + Send + 'static,
N: NetworkAdapter<Key = P::Key> + Send + Sync + 'static,
V: MempoolVerificationProvider<
Payload = N::Payload,
Parameters = <N::Payload as Certificate>::VerificationParameters,
> + Send
+ Sync
+ 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -111,7 +94,6 @@ where
service_state,
network_relay,
pool: P::new(settings.backend),
verification_provider: V::new(settings.verification_provider),
// #[cfg(feature = "metrics")]
// metrics,
})
@ -148,12 +130,10 @@ where
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
}
Some((key, item)) = network_items.next() => {
let params = self.verification_provider.get_parameters(&item).await;
if item.verify(params) {
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
}
// TODO: Inform intrested parties (DA Sampling) about new blob.
pool.add_item(key, item).unwrap_or_else(|e| {
tracing::debug!("could not add item to the pool due to: {}", e)
});
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
@ -166,7 +146,7 @@ where
}
}
impl<N, P, V> DaMempoolService<N, P, V>
impl<N, P> DaMempoolService<N, P>
where
P: MemPool + Send + 'static,
P::Settings: Clone + Send + Sync + 'static,
@ -174,14 +154,8 @@ where
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Debug + Send + 'static,
N::Payload: Certificate + Into<P::Item> + Debug + Clone + Send + 'static,
N::Payload: DispersedBlobInfo + Into<P::Item> + Debug + Clone + Send + 'static,
N: NetworkAdapter<Key = P::Key> + Send + Sync + 'static,
V: MempoolVerificationProvider<
Payload = N::Payload,
Parameters = <N::Payload as Certificate>::VerificationParameters,
> + Send
+ Sync
+ 'static,
{
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
@ -274,9 +248,8 @@ where
}
#[derive(Clone, Debug)]
pub struct DaMempoolSettings<B, N, V> {
pub struct DaMempoolSettings<B, N> {
pub backend: B,
pub network: N,
pub verification_provider: V,
pub registry: Option<NomosRegistry>,
}

View File

@ -1,22 +0,0 @@
use full_replication::{Certificate, CertificateVerificationParameters};
use crate::verify::MempoolVerificationProvider;
pub struct DaVerificationProvider {
settings: CertificateVerificationParameters,
}
#[async_trait::async_trait]
impl MempoolVerificationProvider for DaVerificationProvider {
type Payload = Certificate;
type Parameters = CertificateVerificationParameters;
type Settings = CertificateVerificationParameters;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn get_parameters(&self, _: &Self::Payload) -> Self::Parameters {
self.settings.clone()
}
}

View File

@ -1,22 +0,0 @@
use kzgrs_backend::dispersal::{Certificate, CertificateVerificationParameters};
use crate::verify::MempoolVerificationProvider;
pub struct DaVerificationProvider {
settings: CertificateVerificationParameters,
}
#[async_trait::async_trait]
impl MempoolVerificationProvider for DaVerificationProvider {
type Payload = Certificate;
type Parameters = CertificateVerificationParameters;
type Settings = CertificateVerificationParameters;
fn new(settings: Self::Settings) -> Self {
Self { settings }
}
async fn get_parameters(&self, _: &Self::Payload) -> Self::Parameters {
self.settings.clone()
}
}

View File

@ -1,2 +0,0 @@
pub mod fullreplication;
pub mod kzgrs;

View File

@ -7,7 +7,7 @@ use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, Node};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings, TimeConfig};
use cryptarchia_ledger::{Coin, LedgerState};
use kzgrs_backend::dispersal::Certificate;
use kzgrs_backend::dispersal::BlobInfo;
#[cfg(feature = "mixnet")]
use mixnet::{
address::NodeAddress,
@ -115,7 +115,7 @@ impl NomosNode {
}
}
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, Certificate>> {
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, BlobInfo>> {
CLIENT
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
.header("Content-Type", "application/json")
@ -123,7 +123,7 @@ impl NomosNode {
.send()
.await
.unwrap()
.json::<Option<Block<Tx, Certificate>>>()
.json::<Option<Block<Tx, BlobInfo>>>()
.await
.unwrap()
}