Base cleaning of da to new traits/structure

Added new da protocols and types
This commit is contained in:
danielsanchezq 2024-03-25 16:13:21 +01:00 committed by Gusto
parent 7e4d00cc78
commit 3a06e8f8d8
30 changed files with 124 additions and 874 deletions

View File

@ -14,7 +14,8 @@ members = [
"nomos-da/reed-solomon",
"nomos-da/kzg",
"nomos-da/full-replication",
"nomos-cli",
# TODO: add it again and reimplement full replication
# "nomos-cli",
"nomos-utils",
"nodes/nomos-node",
"mixnet",

View File

@ -20,7 +20,7 @@ use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use full_replication::{Blob, Certificate};
use nomos_core::{da::blob, header::HeaderId, tx::Transaction};
use nomos_core::{header::HeaderId, tx::Transaction};
use nomos_mempool::{
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, openapi::Status,
MempoolMetrics,
@ -29,9 +29,10 @@ use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_storage::backends::StorageSerde;
use nomos_api::{
http::{cl, consensus, da, libp2p, mempool, metrics, storage},
http::{cl, consensus, libp2p, mempool, metrics, storage},
Backend,
};
use nomos_core::da::certificate;
/// Configuration for the Http Server
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
@ -51,8 +52,6 @@ pub struct AxumBackend<T, S, const SIZE: usize> {
#[derive(OpenApi)]
#[openapi(
paths(
da_metrics,
da_status,
),
components(
schemas(Status<HeaderId>, MempoolMetrics)
@ -117,9 +116,6 @@ where
)
.layer(TraceLayer::new_for_http())
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/da/metrics", routing::get(da_metrics))
.route("/da/status", routing::post(da_status))
.route("/da/blobs", routing::post(da_blobs))
.route("/cl/metrics", routing::get(cl_metrics::<T>))
.route("/cl/status", routing::post(cl_status::<T>))
.route(
@ -133,7 +129,6 @@ where
.route("/network/info", routing::get(libp2p_info))
.route("/storage/block", routing::post(block::<S, T>))
.route("/mempool/add/tx", routing::post(add_tx::<T>))
.route("/mempool/add/cert", routing::post(add_cert))
.route("/metrics", routing::get(get_metrics))
.with_state(handle);
@ -158,48 +153,6 @@ macro_rules! make_request_and_return_response {
}};
}
#[utoipa::path(
get,
path = "/da/metrics",
responses(
(status = 200, description = "Get the mempool metrics of the da service", body = MempoolMetrics),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_metrics(State(handle): State<OverwatchHandle>) -> Response {
make_request_and_return_response!(da::da_mempool_metrics(&handle))
}
#[utoipa::path(
post,
path = "/da/status",
responses(
(status = 200, description = "Query the mempool status of the da service", body = Vec<<Blob as blob::Blob>::Hash>),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_status(
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da::da_mempool_status(&handle, items))
}
#[utoipa::path(
post,
path = "/da/blobs",
responses(
(status = 200, description = "Get pending blobs", body = Vec<Blob>),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn da_blobs(
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da::da_blobs(&handle, items))
}
#[utoipa::path(
get,
path = "/cl/metrics",
@ -360,31 +313,6 @@ where
>(&handle, tx, Transaction::hash))
}
#[utoipa::path(
post,
path = "/mempool/add/cert",
responses(
(status = 200, description = "Add certificate to the mempool"),
(status = 500, description = "Internal server error", body = String),
)
)]
async fn add_cert(
State(handle): State<OverwatchHandle>,
Json(cert): Json<Certificate>,
) -> Response {
make_request_and_return_response!(mempool::add::<
NetworkBackend,
MempoolNetworkAdapter<Certificate, <Blob as blob::Blob>::Hash>,
nomos_mempool::Certificate,
Certificate,
<Blob as blob::Blob>::Hash,
>(
&handle,
cert,
nomos_core::da::certificate::Certificate::hash
))
}
#[utoipa::path(
get,
path = "/metrics",

View File

@ -4,11 +4,9 @@ use std::{
};
use crate::api::AxumBackend;
use crate::DataAvailability;
use crate::{Tx, Wire, MB16};
use clap::{Parser, ValueEnum};
use color_eyre::eyre::{eyre, Result};
use hex::FromHex;
use nomos_api::ApiService;
use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
@ -85,12 +83,6 @@ pub struct CryptarchiaArgs {
slot_duration: Option<u64>,
}
#[derive(Parser, Debug, Clone)]
pub struct DaArgs {
#[clap(long = "da-voter", env = "DA_VOTER")]
da_voter: Option<String>,
}
#[derive(Parser, Debug, Clone)]
pub struct MetricsArgs {
#[clap(long = "with-metrics", env = "WITH_METRICS")]
@ -103,7 +95,6 @@ pub struct Config {
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
pub http: <ApiService<AxumBackend<Tx, Wire, MB16>> as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
pub da: <DataAvailability as ServiceData>::Settings,
}
impl Config {
@ -217,15 +208,4 @@ impl Config {
Ok(self)
}
pub fn update_da(mut self, da_args: DaArgs) -> Result<Self> {
let DaArgs { da_voter } = da_args;
if let Some(voter) = da_voter {
let bytes = <[u8; 32]>::from_hex(voter)?;
self.da.da_protocol.voter = bytes;
}
Ok(self)
}
}

View File

@ -10,18 +10,9 @@ use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsServic
use api::AxumBackend;
use bytes::Bytes;
pub use config::{Config, CryptarchiaArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs};
use nomos_api::ApiService;
use nomos_core::{
da::{blob, certificate},
header::HeaderId,
tx::Transaction,
wire,
};
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter,
DataAvailabilityService,
};
use nomos_core::{da::certificate, header::HeaderId, tx::Transaction, wire};
use nomos_log::Logger;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::{
@ -58,26 +49,13 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, Certificate>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
HeaderId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MempoolNetworkAdapter<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MockPool<HeaderId, Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, Certificate>,
RocksBackend<Wire>,
>;
pub type DataAvailability = DataAvailabilityService<
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
DaNetworkAdapter<Blob, Attestation>,
>;
type Mempool<K, V, D> = MempoolService<MempoolNetworkAdapter<K, V>, MockPool<HeaderId, K, V>, D>;
#[derive(Services)]
@ -86,15 +64,10 @@ pub struct Nomos {
network: ServiceHandle<NetworkService<NetworkBackend>>,
cl_mempool: ServiceHandle<Mempool<Tx, <Tx as Transaction>::Hash, TxDiscriminant>>,
da_mempool: ServiceHandle<
Mempool<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
CertDiscriminant,
>,
Mempool<Certificate, <Certificate as certificate::Certificate>::Id, CertDiscriminant>,
>,
cryptarchia: ServiceHandle<Cryptarchia>,
http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>,
da: ServiceHandle<DataAvailability>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,

View File

@ -2,16 +2,13 @@ use full_replication::{Blob, Certificate};
#[cfg(feature = "metrics")]
use nomos_metrics::MetricsSettings;
use nomos_node::{
Config, CryptarchiaArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos,
Config, CryptarchiaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos,
NomosServiceSettings, Tx,
};
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_core::{
da::{blob, certificate},
tx::Transaction,
};
use nomos_core::{da::certificate, tx::Transaction};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
@ -35,9 +32,6 @@ struct Args {
http_args: HttpArgs,
#[clap(flatten)]
cryptarchia_args: CryptarchiaArgs,
/// Overrides da config.
#[clap(flatten)]
da_args: DaArgs,
/// Overrides metrics config.
#[clap(flatten)]
metrics_args: MetricsArgs,
@ -46,7 +40,6 @@ struct Args {
fn main() -> Result<()> {
let Args {
config,
da_args,
log_args,
http_args,
network_args,
@ -54,7 +47,6 @@ fn main() -> Result<()> {
metrics_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_da(da_args)?
.update_log(log_args)?
.update_http(http_args)?
.update_network(network_args)?
@ -92,7 +84,6 @@ fn main() -> Result<()> {
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
da: config.da,
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
read_only: false,
@ -107,7 +98,7 @@ fn main() -> Result<()> {
Ok(())
}
fn cert_id(cert: &Certificate) -> <Blob as blob::Blob>::Hash {
fn cert_id(cert: &Certificate) -> <Certificate as certificate::Certificate>::Id {
use certificate::Certificate;
cert.hash()
cert.id()
}

View File

@ -1,18 +1,4 @@
use super::CLIENT;
use full_replication::Blob;
use nomos_core::da::blob;
use full_replication::Certificate;
use nomos_core::da::certificate;
use reqwest::Url;
pub async fn get_blobs(
node: &Url,
ids: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Blob>, reqwest::Error> {
const BLOBS_PATH: &str = "da/blobs";
CLIENT
.post(node.join(BLOBS_PATH).unwrap())
.json(&ids)
.send()
.await?
.json()
.await
}

View File

@ -6,19 +6,19 @@ mod ui;
use crate::{
api::consensus::get_headers_info,
da::{
disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
},
retrieve::get_block_blobs,
},
//da::{
// disseminate::{
// DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
// },
// retrieve::get_block_blobs,
//},
};
use clap::Args;
use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use futures::{stream, StreamExt};
use nomos_core::{da::DaProtocol, header::HeaderId, wire};
use nomos_core::{header::HeaderId, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_network::NetworkService;

View File

@ -1,21 +1,22 @@
use clap::Subcommand;
pub mod chat;
pub mod disseminate;
// pub mod chat;
// pub mod disseminate;
#[derive(Debug, Subcommand)]
pub enum Command {
/// Send a blob to the network and collect attestations to create a DA proof
Disseminate(disseminate::Disseminate),
/// (Almost) Instant messaging protocol on top of the Nomos network
Chat(chat::NomosChat),
// /// Send a blob to the network and collect attestations to create a DA proof
// Disseminate(disseminate::Disseminate),
// /// (Almost) Instant messaging protocol on top of the Nomos network
// Chat(chat::NomosChat),
}
impl Command {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
match self {
Command::Disseminate(cmd) => cmd.run(),
Command::Chat(cmd) => cmd.run(),
}
// match self {
// Command::Disseminate(cmd) => cmd.run(),
// Command::Chat(cmd) => cmd.run(),
// }
Ok(())
}
}

View File

@ -3,7 +3,7 @@ use clap::{Args, ValueEnum};
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication, Voter};
use futures::StreamExt;
use hex::FromHex;
use nomos_core::{da::DaProtocol, wire};
use nomos_core::wire;
use nomos_da::network::{adapters::libp2p::Libp2pAdapter as DaNetworkAdapter, NetworkAdapter};
use nomos_log::Logger;
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;

View File

@ -1,6 +1,6 @@
pub mod api;
pub mod cmds;
pub mod da;
// pub mod da;
use clap::Parser;
use cmds::Command;

View File

@ -1,11 +1,7 @@
use crate::da::blob::Blob;
use bytes::Bytes;
use std::hash::Hash;
pub trait Attestation {
type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes;
type Signature;
fn attestation_signature(&self) -> Self::Signature;
}

View File

@ -1,13 +0,0 @@
use bytes::Bytes;
use std::hash::Hash;
pub type BlobHasher<T> = fn(&T) -> <T as Blob>::Hash;
pub trait Blob {
const HASHER: BlobHasher<Self>;
type Hash: Hash + Eq + Clone;
fn hash(&self) -> Self::Hash {
Self::HASHER(self)
}
fn as_bytes(&self) -> Bytes;
}

View File

@ -1,15 +1,16 @@
pub mod select;
use crate::da::blob::Blob;
use bytes::Bytes;
use std::hash::Hash;
pub trait Certificate {
type Blob: Blob;
type Hash: Hash + Eq + Clone;
fn blob(&self) -> <Self::Blob as Blob>::Hash;
fn hash(&self) -> Self::Hash;
fn as_bytes(&self) -> Bytes;
type VerificationParameters;
type Signature;
type Id;
fn signers(&self) -> Vec<bool>;
fn signature(&self) -> Self::Signature;
fn id(&self) -> Self::Id;
fn verify(&self, authorization_parameters: Self::VerificationParameters) -> bool;
}
pub trait BlobCertificateSelect {

View File

@ -32,7 +32,7 @@ impl<const SIZE: usize, C: Certificate> BlobCertificateSelect for FillSize<SIZE,
certificates: I,
) -> impl Iterator<Item = Self::Certificate> + 'i {
utils::select::select_from_till_fill_size::<SIZE, Self::Certificate>(
|blob| blob.as_bytes().len(),
|blob| SIZE,
certificates,
)
}

View File

@ -0,0 +1,6 @@
use super::certificate::Certificate;
pub trait CertificateExtension {
type Extension;
fn extension(&self) -> Self::Extension;
}

View File

@ -1,40 +1,26 @@
use std::error::Error;
// crates
use bytes::Bytes;
// internal
use crate::da::attestation::Attestation;
use crate::da::blob::Blob;
use crate::da::certificate::Certificate;
pub mod attestation;
pub mod blob;
pub mod certificate;
pub mod certificate_metadata;
pub mod vid;
pub trait DaProtocol {
type Blob: Blob;
type Attestation: Attestation;
type Certificate: Certificate;
type Settings: Clone;
// Construct a new instance
fn new(settings: Self::Settings) -> Self;
/// Encode bytes into blobs
fn encode<T: AsRef<[u8]>>(&self, data: T) -> Vec<Self::Blob>;
/// Feed a blob for decoding.
/// Depending on the protocol, it may be necessary to feed multiple blobs to
/// recover the initial data.
fn recv_blob(&mut self, blob: Self::Blob);
/// Attempt to recover the initial data from fed blobs.
/// If the protocol is not yet ready to return the data, return None.
fn extract(&mut self) -> Option<Bytes>;
/// Attest that we have received and stored a blob.
fn attest(&self, blob: &Self::Blob) -> Self::Attestation;
/// Validate that an attestation is valid for a blob.
fn validate_attestation(&self, blob: &Self::Blob, attestation: &Self::Attestation) -> bool;
/// Buffer attestations to produce a certificate of correct dispersal.
fn recv_attestation(&mut self, attestation: Self::Attestation);
/// Attempt to produce a certificate of correct disperal for a blob.
/// If the protocol is not yet ready to return the certificate, return None.
fn certify_dispersal(&mut self) -> Option<Self::Certificate>;
/// Validate a certificate.
fn validate_certificate(&self, certificate: &Self::Certificate) -> bool;
pub trait DaEncoder {
type EncodedData;
fn encode(b: &[u8]) -> Result<Self::EncodedData, impl Error>;
}
pub trait DaVerifier {
type DaBlob;
type Attestation;
fn verify(&self, blob: Self::DaBlob) -> Result<Self::Attestation, impl Error>;
}
pub trait DaDispersal {
type EncodedData;
type Certificate;
fn disperse(&self, encoded_data: Self::EncodedData) -> Result<Self::Certificate, impl Error>;
}

7
nomos-core/src/da/vid.rs Normal file
View File

@ -0,0 +1,7 @@
use crate::da::certificate_metadata::CertificateExtension;
pub trait VID: CertificateExtension {
type CertificateId;
fn certificate_id(&self) -> Self::CertificateId;
}

View File

@ -1,8 +1,7 @@
// internal
use nomos_core::da::{
attestation::{self, Attestation as _},
blob::{self, BlobHasher},
certificate, DaProtocol,
certificate,
};
// std
use std::collections::HashSet;
@ -13,6 +12,7 @@ use blake2::{
Blake2bVar,
};
use bytes::Bytes;
use nomos_core::da::certificate_metadata::CertificateExtension;
use nomos_core::wire;
use serde::{Deserialize, Serialize};
@ -111,15 +111,6 @@ fn hasher(blob: &Blob) -> [u8; 32] {
output
}
impl blob::Blob for Blob {
const HASHER: BlobHasher<Self> = hasher as BlobHasher<Self>;
type Hash = [u8; 32];
fn as_bytes(&self) -> bytes::Bytes {
self.data.clone()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Attestation {
@ -128,22 +119,11 @@ pub struct Attestation {
}
impl attestation::Attestation for Attestation {
type Blob = Blob;
type Hash = [u8; 32];
type Signature = [u8; 32];
fn blob(&self) -> [u8; 32] {
self.blob
}
fn hash(&self) -> <Self::Blob as blob::Blob>::Hash {
fn attestation_signature(&self) -> Self::Signature {
hash([self.blob, self.voter].concat())
}
fn as_bytes(&self) -> Bytes {
wire::serialize(self)
.expect("Attestation shouldn't fail to be serialized")
.into()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
@ -154,92 +134,54 @@ pub struct Certificate {
impl Hash for Certificate {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(certificate::Certificate::as_bytes(self).as_ref());
state.write(<Certificate as certificate::Certificate>::id(self).as_ref());
}
}
impl certificate::Certificate for Certificate {
type Blob = Blob;
type Hash = [u8; 32];
pub struct CertificateVerificationParameters {
threshold: usize,
}
fn blob(&self) -> <Self::Blob as blob::Blob>::Hash {
self.attestations[0].blob
impl certificate::Certificate for Certificate {
type VerificationParameters = CertificateVerificationParameters;
type Signature = [u8; 32];
type Id = [u8; 32];
fn signers(&self) -> Vec<bool> {
self.attestations.iter().map(|_| true).collect()
}
fn hash(&self) -> <Self::Blob as blob::Blob>::Hash {
fn signature(&self) -> Self::Signature {
let signatures: Vec<u8> = self
.attestations
.iter()
.map(|attestation| attestation.attestation_signature())
.flatten()
.collect();
hash(signatures)
}
fn id(&self) -> Self::Id {
let mut input = self
.attestations
.iter()
.map(|a| a.hash())
.map(|a| a.attestation_signature())
.collect::<Vec<_>>();
// sort to make the hash deterministic
input.sort();
hash(input.concat())
}
fn as_bytes(&self) -> Bytes {
wire::serialize(self)
.expect("Certificate shouldn't fail to be serialized")
.into()
fn verify(&self, authorization_parameters: Self::VerificationParameters) -> bool {
authorization_parameters.threshold <= self.attestations.len()
}
}
// TODO: add generic impl when the trait for Certificate is expanded
impl DaProtocol for FullReplication<AbsoluteNumber<Attestation, Certificate>> {
type Blob = Blob;
type Attestation = Attestation;
type Certificate = Certificate;
type Settings = Settings;
impl CertificateExtension for Certificate {
type Extension = ();
fn new(settings: Self::Settings) -> Self {
Self::new(
settings.voter,
AbsoluteNumber::new(settings.num_attestations),
)
}
fn encode<T: AsRef<[u8]>>(&self, data: T) -> Vec<Self::Blob> {
vec![Blob {
data: Bytes::copy_from_slice(data.as_ref()),
}]
}
fn recv_blob(&mut self, blob: Self::Blob) {
self.output_buffer.push(blob.data);
}
fn extract(&mut self) -> Option<Bytes> {
self.output_buffer.pop()
}
fn attest(&self, blob: &Self::Blob) -> Self::Attestation {
Attestation {
blob: hasher(blob),
voter: self.voter,
}
}
fn validate_attestation(&self, blob: &Self::Blob, attestation: &Self::Attestation) -> bool {
hasher(blob) == attestation.blob
}
fn recv_attestation(&mut self, attestation: Self::Attestation) {
self.attestations.push(attestation);
if self.certificate_strategy.can_build(&self.attestations) {
self.output_certificate_buf.push(
self.certificate_strategy
.build(std::mem::take(&mut self.attestations)),
);
}
}
fn certify_dispersal(&mut self) -> Option<Self::Certificate> {
self.output_certificate_buf.pop()
}
fn validate_certificate(&self, certificate: &Self::Certificate) -> bool {
self.certificate_strategy
.can_build(&certificate.attestations)
fn extension(&self) -> Self::Extension {
()
}
}

View File

@ -11,10 +11,7 @@ use cryptarchia_consensus::{
};
use full_replication::Certificate;
use nomos_core::{
da::{
blob,
certificate::{self, select::FillSize as FillSizeWithBlobsCertificate},
},
da::certificate::{self, select::FillSize as FillSizeWithBlobsCertificate},
header::HeaderId,
tx::{select::FillSize as FillSizeWithTx, Transaction},
};
@ -27,15 +24,8 @@ pub type Cryptarchia<Tx, SS, const SIZE: usize> = CryptarchiaConsensus<
ConsensusNetworkAdapter<Tx, Certificate>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<
HeaderId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MempoolNetworkAdapter<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MockPool<HeaderId, Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobsCertificate<SIZE, Certificate>,
RocksBackend<SS>,

View File

@ -1,75 +0,0 @@
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication};
use nomos_core::da::blob;
use nomos_core::header::HeaderId;
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter,
DaMsg, DataAvailabilityService,
};
use nomos_mempool::{
backend::mockpool::MockPool,
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
openapi::{MempoolMetrics, Status},
Certificate as CertDiscriminant, MempoolMsg, MempoolService,
};
use tokio::sync::oneshot;
pub type DaMempoolService = MempoolService<
MempoolNetworkAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<HeaderId, Certificate, <Blob as blob::Blob>::Hash>,
CertDiscriminant,
>;
pub type DataAvailability = DataAvailabilityService<
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
DaNetworkAdapter<Blob, Attestation>,
>;
pub async fn da_mempool_metrics(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<MempoolMetrics, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await.unwrap())
}
pub async fn da_mempool_status(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Status<HeaderId>>, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Status {
items,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await.unwrap())
}
pub async fn da_blobs(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
ids: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Blob>, super::DynError> {
let relay = handle.relay::<DataAvailability>().connect().await?;
let (reply_channel, receiver) = oneshot::channel();
relay
.send(DaMsg::Get {
ids: Box::new(ids.into_iter()),
reply_channel,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}

View File

@ -1,7 +1,6 @@
pub type DynError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub mod cl;
pub mod consensus;
pub mod da;
pub mod libp2p;
pub mod mempool;
pub mod metrics;

View File

@ -8,6 +8,7 @@ use cryptarchia_ledger::{Coin, LeaderProof, LedgerState};
use futures::StreamExt;
use network::{messages::NetworkMessage, NetworkAdapter};
use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
use nomos_core::da::certificate_metadata::CertificateExtension;
use nomos_core::header::{cryptarchia::Header, HeaderId};
use nomos_core::tx::{Transaction, TxSelect};
use nomos_core::{
@ -201,7 +202,9 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: Certificate<Hash = DaPool::Key>
// TODO: Change to specific certificate bounds here
DaPool::Item: Certificate<Id = DaPool::Key>
+ CertificateExtension
+ Debug
+ Clone
+ Eq
@ -368,7 +371,8 @@ where
+ Send
+ Sync
+ 'static,
DaPool::Item: Certificate<Hash = DaPool::Key>
DaPool::Item: Certificate<Id = DaPool::Key>
+ CertificateExtension
+ Debug
+ Clone
+ Eq
@ -489,7 +493,7 @@ where
)
.await;
mark_in_block(da_mempool_relay, block.blobs().map(Certificate::hash), id).await;
mark_in_block(da_mempool_relay, block.blobs().map(Certificate::id), id).await;
// store block
let msg = <StorageMsg<_>>::new_store_message(header.id(), block.clone());

View File

@ -1,68 +0,0 @@
use crate::backend::{DaBackend, DaError};
use moka::future::{Cache, CacheBuilder};
use nomos_core::da::blob::Blob;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BlobCacheSettings {
pub max_capacity: usize,
pub evicting_period: Duration,
}
pub struct BlobCache<H, B>(Cache<H, B>);
impl<B> BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
pub fn new(settings: BlobCacheSettings) -> Self {
let BlobCacheSettings {
max_capacity,
evicting_period,
} = settings;
let cache = CacheBuilder::new(max_capacity as u64)
.time_to_live(evicting_period)
// can we leverage this to evict really old blobs?
.time_to_idle(evicting_period)
.build();
Self(cache)
}
pub async fn add(&self, blob: B) {
self.0.insert(blob.hash(), blob).await
}
pub async fn remove(&self, hash: &B::Hash) {
self.0.remove(hash).await;
}
}
#[async_trait::async_trait]
impl<B> DaBackend for BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
type Settings = BlobCacheSettings;
type Blob = B;
fn new(settings: Self::Settings) -> Self {
BlobCache::new(settings)
}
async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError> {
self.add(blob).await;
Ok(())
}
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError> {
self.remove(blob).await;
Ok(())
}
fn get_blob(&self, id: &<Self::Blob as Blob>::Hash) -> Option<Self::Blob> {
self.0.get(id)
}
}

View File

@ -1,24 +0,0 @@
pub mod memory_cache;
use nomos_core::da::blob::Blob;
use overwatch_rs::DynError;
#[derive(Debug)]
pub enum DaError {
Dyn(DynError),
}
#[async_trait::async_trait]
pub trait DaBackend {
type Settings: Clone;
type Blob: Blob;
fn new(settings: Self::Settings) -> Self;
async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError>;
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError>;
fn get_blob(&self, id: &<Self::Blob as Blob>::Hash) -> Option<Self::Blob>;
}

View File

@ -1,214 +1 @@
pub mod backend;
pub mod network;
// std
use overwatch_rs::DynError;
use std::fmt::{Debug, Formatter};
// crates
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot::Sender;
// internal
use crate::backend::{DaBackend, DaError};
use crate::network::NetworkAdapter;
use nomos_core::da::{blob::Blob, DaProtocol};
use nomos_network::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use tracing::error;
pub struct DataAvailabilityService<Protocol, Backend, Network>
where
Protocol: DaProtocol,
Backend: DaBackend<Blob = Protocol::Blob>,
Backend::Blob: 'static,
Network: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
{
service_state: ServiceStateHandle<Self>,
backend: Backend,
da: Protocol,
network_relay: Relay<NetworkService<Network::Backend>>,
}
pub enum DaMsg<B: Blob> {
RemoveBlobs {
blobs: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
},
Get {
ids: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
reply_channel: Sender<Vec<B>>,
},
}
impl<B: Blob + 'static> Debug for DaMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DaMsg::RemoveBlobs { .. } => {
write!(f, "DaMsg::RemoveBlobs")
}
DaMsg::Get { .. } => {
write!(f, "DaMsg::Get")
}
}
}
}
impl<B: Blob + 'static> RelayMessage for DaMsg<B> {}
impl<Protocol, Backend, Network> ServiceData for DataAvailabilityService<Protocol, Backend, Network>
where
Protocol: DaProtocol,
Backend: DaBackend<Blob = Protocol::Blob>,
Backend::Blob: 'static,
Network: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
{
const SERVICE_ID: ServiceId = "DA";
type Settings = Settings<Protocol::Settings, Backend::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = DaMsg<Protocol::Blob>;
}
impl<Protocol, Backend, Network> DataAvailabilityService<Protocol, Backend, Network>
where
Protocol: DaProtocol + Send + Sync,
Backend: DaBackend<Blob = Protocol::Blob> + Send + Sync,
Protocol::Settings: Clone + Send + Sync + 'static,
Protocol::Blob: 'static,
Backend::Settings: Clone + Send + Sync + 'static,
Protocol::Blob: Send,
Protocol::Attestation: Send,
<Backend::Blob as Blob>::Hash: Debug + Send + Sync,
Network:
NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation> + Send + Sync,
{
async fn handle_new_blob(
da: &Protocol,
backend: &Backend,
adapter: &Network,
blob: Protocol::Blob,
) -> Result<(), DaError> {
// we need to handle the reply (verification + signature)
let attestation = da.attest(&blob);
backend.add_blob(blob).await?;
// we do not call `da.recv_blob` here because that is meant to
// be called to retrieve the original data, while here we're only interested
// in storing the blob.
// We might want to refactor the backend to be part of implementations of the
// Da protocol instead of this service and clear this confusion.
adapter
.send_attestation(attestation)
.await
.map_err(DaError::Dyn)
}
async fn handle_da_msg(backend: &Backend, msg: DaMsg<Backend::Blob>) -> Result<(), DaError> {
match msg {
DaMsg::RemoveBlobs { blobs } => {
futures::stream::iter(blobs)
.for_each_concurrent(None, |blob| async move {
if let Err(e) = backend.remove_blob(&blob).await {
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
}
})
.await;
}
DaMsg::Get { ids, reply_channel } => {
let res = ids.filter_map(|id| backend.get_blob(&id)).collect();
if reply_channel.send(res).is_err() {
tracing::error!("Could not returns blobs");
}
}
}
Ok(())
}
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
}
#[async_trait::async_trait]
impl<Protocol, Backend, Network> ServiceCore for DataAvailabilityService<Protocol, Backend, Network>
where
Protocol: DaProtocol + Send + Sync,
Backend: DaBackend<Blob = Protocol::Blob> + Send + Sync,
Protocol::Settings: Clone + Send + Sync + 'static,
Backend::Settings: Clone + Send + Sync + 'static,
Protocol::Blob: Send,
Protocol::Attestation: Send,
<Backend::Blob as Blob>::Hash: Debug + Send + Sync,
Network:
NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation> + Send + Sync,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
let backend = Backend::new(settings.backend);
let da = Protocol::new(settings.da_protocol);
Ok(Self {
service_state,
backend,
da,
network_relay,
})
}
async fn run(self) -> Result<(), DynError> {
let Self {
mut service_state,
backend,
da,
network_relay,
} = self;
let network_relay = network_relay
.connect()
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = Network::new(network_relay).await;
let mut network_blobs = adapter.blob_stream().await;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(blob) = network_blobs.next() => {
if let Err(e) = Self::handle_new_blob(&da, &backend, &adapter, blob).await {
tracing::debug!("Failed to add a new received blob: {e:?}");
}
}
Some(msg) = service_state.inbound_relay.recv() => {
if let Err(e) = Self::handle_da_msg(&backend, msg).await {
tracing::debug!("Failed to handle da msg: {e:?}");
}
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Settings<P, B> {
pub da_protocol: P,
pub backend: B,
}

View File

@ -1,113 +0,0 @@
// std
use futures::Stream;
use overwatch_rs::DynError;
use std::marker::PhantomData;
// crates
// internal
use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::debug;
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
pub struct Libp2pAdapter<B, A> {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
_blob: PhantomData<B>,
_attestation: PhantomData<A>,
}
impl<B, A> Libp2pAdapter<B, A>
where
B: Serialize + DeserializeOwned + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Send + Sync + 'static,
{
async fn stream_for<E: DeserializeOwned>(&self) -> Box<dyn Stream<Item = E> + Unpin + Send> {
let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC);
let (sender, receiver) = tokio::sync::oneshot::channel();
self.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
.expect("Network backend should be ready");
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
move |msg| match msg {
Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => {
match wire::deserialize::<E>(&data) {
Ok(msg) => Some(msg),
Err(e) => {
debug!("Unrecognized message: {e}");
None
}
}
}
_ => None,
},
)))
}
async fn send<E: Serialize>(&self, data: E) -> Result<(), DynError> {
let message = wire::serialize(&data)?.into_boxed_slice();
self.network_relay
.send(NetworkMsg::Process(Command::Broadcast {
topic: NOMOS_DA_TOPIC.to_string(),
message,
}))
.await
.map_err(|(e, _)| Box::new(e) as DynError)
}
}
#[async_trait::async_trait]
impl<B, A> NetworkAdapter for Libp2pAdapter<B, A>
where
B: Serialize + DeserializeOwned + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Send + Sync + 'static,
{
type Backend = Libp2p;
type Blob = B;
type Attestation = A;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
network_relay
.send(NetworkMsg::Process(Command::Subscribe(
NOMOS_DA_TOPIC.to_string(),
)))
.await
.expect("Network backend should be ready");
Self {
network_relay,
_blob: Default::default(),
_attestation: Default::default(),
}
}
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send> {
self.stream_for::<Self::Blob>().await
}
async fn attestation_stream(&self) -> Box<dyn Stream<Item = Self::Attestation> + Unpin + Send> {
self.stream_for::<Self::Attestation>().await
}
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> {
self.send(attestation).await
}
async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError> {
self.send(blob).await
}
}

View File

@ -1,2 +0,0 @@
#[cfg(feature = "libp2p")]
pub mod libp2p;

View File

@ -1,33 +0,0 @@
pub mod adapters;
// std
// crates
use futures::Stream;
// internal
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use serde::de::DeserializeOwned;
use serde::Serialize;
#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + 'static;
type Blob: Serialize + DeserializeOwned + Send + Sync + 'static;
type Attestation: Serialize + DeserializeOwned + Send + Sync + 'static;
async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
async fn attestation_stream(&self) -> Box<dyn Stream<Item = Self::Attestation> + Unpin + Send>;
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;
async fn send_blob(&self, blob: Self::Blob) -> Result<(), DynError>;
}

View File

@ -1,5 +1,5 @@
pub mod nodes;
pub use nodes::NomosNode;
// pub use nodes::NomosNode;
use once_cell::sync::Lazy;
use std::env;

View File

@ -1,5 +1,5 @@
pub mod nomos;
pub use nomos::NomosNode;
// pub mod nomos;
// pub use nomos::NomosNode;
use tempfile::TempDir;