diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index bb1f97d7..e8db37d8 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -25,6 +25,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] } nomos-consensus = { path = "../../nomos-services/consensus" } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } +nomos-da = { path = "../../nomos-services/data-availability" } metrics = { path = "../../nomos-services/metrics", optional = true } tracing-subscriber = "0.3" consensus-engine = { path = "../../consensus-engine" } @@ -34,7 +35,7 @@ serde_yaml = "0.9" color-eyre = "0.6.0" serde = "1" waku-bindings = { version = "0.1.1", optional = true } - +full-replication = { path = "../../nomos-da/full-replication" } [features] default = ["libp2p"] diff --git a/nodes/nomos-node/src/blob.rs b/nodes/nomos-node/src/blob.rs deleted file mode 100644 index 5860973b..00000000 --- a/nodes/nomos-node/src/blob.rs +++ /dev/null @@ -1 +0,0 @@ -pub type Blob = Box<[u8]>; diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index e967b323..96885378 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -5,6 +5,8 @@ use std::{ }; use crate::Carnot; +#[cfg(feature = "libp2p")] +use crate::DataAvailability; use clap::{Parser, ValueEnum}; use color_eyre::eyre::{self, eyre, Result}; use hex::FromHex; @@ -129,6 +131,8 @@ pub struct Config { pub consensus: ::Settings, #[cfg(feature = "metrics")] pub metrics: > as ServiceData>::Settings, + #[cfg(feature = "libp2p")] + pub da: ::Settings, } impl Config { diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 2a886b43..a1cac91b 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -1,9 +1,11 @@ -mod blob; mod config; mod tx; use color_eyre::eyre::Result; use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin}; +use full_replication::Blob; +#[cfg(feature = "libp2p")] +use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication}; #[cfg(feature = "metrics")] use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; #[cfg(feature = "libp2p")] @@ -11,6 +13,11 @@ use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2p #[cfg(feature = "waku")] use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter; use nomos_consensus::CarnotConsensus; +#[cfg(feature = "libp2p")] +use nomos_da::{ + backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, + DataAvailabilityService, +}; use nomos_http::backends::axum::AxumBackend; use nomos_http::bridge::HttpBridgeService; use nomos_http::http::HttpService; @@ -28,7 +35,6 @@ use nomos_network::NetworkService; use overwatch_derive::*; use overwatch_rs::services::handle::ServiceHandle; -use crate::blob::Blob; pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; pub use tx::Tx; @@ -53,6 +59,13 @@ pub type Carnot = CarnotConsensus< Blob, >; +#[cfg(feature = "libp2p")] +type DataAvailability = DataAvailabilityService< + FullReplication>, + BlobCache<::Hash, Blob>, + DaLibp2pAdapter, +>; + #[derive(Services)] pub struct Nomos { logging: ServiceHandle, @@ -69,4 +82,6 @@ pub struct Nomos { bridges: ServiceHandle, #[cfg(feature = "metrics")] metrics: ServiceHandle>>, + #[cfg(feature = "libp2p")] + da: ServiceHandle, } diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index efad7d7b..75b3042f 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -83,6 +83,8 @@ fn main() -> Result<()> { bridges: HttpBridgeSettings { bridges }, #[cfg(feature = "metrics")] metrics: config.metrics, + #[cfg(feature = "libp2p")] + da: config.da, }, None, ) diff --git a/nomos-core/src/da/mod.rs b/nomos-core/src/da/mod.rs index f217553b..66321b11 100644 --- a/nomos-core/src/da/mod.rs +++ b/nomos-core/src/da/mod.rs @@ -13,7 +13,10 @@ pub trait DaProtocol { type Blob: Blob; type Attestation: Attestation; type Certificate: Certificate; + type Settings: Clone; + // Construct a new instance + fn new(settings: Self::Settings) -> Self; /// Encode bytes into blobs fn encode>(&self, data: T) -> Vec; /// Feed a blob for decoding. diff --git a/nomos-da/full-replication/src/lib.rs b/nomos-da/full-replication/src/lib.rs index ad712f1d..033d7375 100644 --- a/nomos-da/full-replication/src/lib.rs +++ b/nomos-da/full-replication/src/lib.rs @@ -59,6 +59,11 @@ impl AbsoluteNumber { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Settings { + pub num_attestations: usize, +} + impl CertificateStrategy for AbsoluteNumber { type Attestation = Attestation; type Certificate = Certificate; @@ -79,7 +84,7 @@ impl CertificateStrategy for AbsoluteNumber { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)] pub struct Blob { data: Bytes, @@ -130,6 +135,11 @@ impl DaProtocol for FullReplication> { type Blob = Blob; type Attestation = Attestation; type Certificate = Certificate; + type Settings = Settings; + + fn new(settings: Self::Settings) -> Self { + Self::new(AbsoluteNumber::new(settings.num_attestations)) + } fn encode>(&self, data: T) -> Vec { vec![Blob { diff --git a/nomos-services/data-availability/src/backend/memory_cache.rs b/nomos-services/data-availability/src/backend/memory_cache.rs index 94d71d34..1935a243 100644 --- a/nomos-services/data-availability/src/backend/memory_cache.rs +++ b/nomos-services/data-availability/src/backend/memory_cache.rs @@ -1,12 +1,13 @@ use crate::backend::{DaBackend, DaError}; use moka::future::{Cache, CacheBuilder}; use nomos_core::da::blob::Blob; +use serde::{Deserialize, Serialize}; use std::time::Duration; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct BlobCacheSettings { - max_capacity: usize, - evicting_period: Duration, + pub max_capacity: usize, + pub evicting_period: Duration, } pub struct BlobCache(Cache); diff --git a/nomos-services/data-availability/src/backend/mod.rs b/nomos-services/data-availability/src/backend/mod.rs index ecae876f..3a703a3d 100644 --- a/nomos-services/data-availability/src/backend/mod.rs +++ b/nomos-services/data-availability/src/backend/mod.rs @@ -1,4 +1,4 @@ -mod memory_cache; +pub mod memory_cache; use nomos_core::da::blob::Blob; use overwatch_rs::DynError; diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index 42c647a0..076a8002 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -1,31 +1,34 @@ -mod backend; -mod network; +pub mod backend; +pub mod network; // std use overwatch_rs::DynError; use std::fmt::{Debug, Formatter}; // crates use futures::StreamExt; +use serde::{Deserialize, Serialize}; use tokio::sync::oneshot::Sender; // internal use crate::backend::{DaBackend, DaError}; use crate::network::NetworkAdapter; -use nomos_core::da::blob::Blob; +use nomos_core::da::{blob::Blob, DaProtocol}; use nomos_network::NetworkService; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; -pub struct DataAvailabilityService +pub struct DataAvailabilityService where - B: DaBackend, - B::Blob: 'static, - N: NetworkAdapter, + Protocol: DaProtocol, + Backend: DaBackend, + Backend::Blob: 'static, + Network: NetworkAdapter, { service_state: ServiceStateHandle, - backend: B, - network_relay: Relay>, + backend: Backend, + da: Protocol, + network_relay: Relay>, } pub enum DaMsg { @@ -52,36 +55,42 @@ impl Debug for DaMsg { impl RelayMessage for DaMsg {} -impl ServiceData for DataAvailabilityService +impl ServiceData for DataAvailabilityService where - B: DaBackend, - B::Blob: 'static, - N: NetworkAdapter, + Protocol: DaProtocol, + Backend: DaBackend, + Backend::Blob: 'static, + Network: NetworkAdapter, { const SERVICE_ID: ServiceId = "DA"; - type Settings = B::Settings; + type Settings = Settings; type State = NoState; type StateOperator = NoOperator; - type Message = DaMsg; + type Message = DaMsg; } #[async_trait::async_trait] -impl ServiceCore for DataAvailabilityService +impl ServiceCore for DataAvailabilityService where - B: DaBackend + Send + Sync, - B::Settings: Clone + Send + Sync + 'static, - B::Blob: Send, - ::Hash: Debug + Send + Sync, - // TODO: Reply type must be piped together, for now empty array. - N: NetworkAdapter + Send + Sync, + Protocol: DaProtocol + Send + Sync, + Backend: DaBackend + Send + Sync, + Protocol::Settings: Clone + Send + Sync + 'static, + Backend::Settings: Clone + Send + Sync + 'static, + Protocol::Blob: Send, + Protocol::Attestation: Send, + ::Hash: Debug + Send + Sync, + Network: + NetworkAdapter + Send + Sync, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); - let backend_settings = service_state.settings_reader.get_updated_settings(); - let backend = B::new(backend_settings); + let settings = service_state.settings_reader.get_updated_settings(); + let backend = Backend::new(settings.backend); + let da = Protocol::new(settings.da_protocol); Ok(Self { service_state, backend, + da, network_relay, }) } @@ -90,6 +99,7 @@ where let Self { mut service_state, mut backend, + mut da, network_relay, } = self; @@ -98,12 +108,12 @@ where .await .expect("Relay connection with NetworkService should succeed"); - let adapter = N::new(network_relay).await; + let adapter = Network::new(network_relay).await; let mut network_blobs = adapter.blob_stream().await; loop { tokio::select! { Some(blob) = network_blobs.next() => { - if let Err(e) = handle_new_blob(&mut backend, &adapter, blob).await { + if let Err(e) = handle_new_blob(&mut da, &mut backend, &adapter, blob).await { tracing::debug!("Failed to add a new received blob: {e:?}"); } } @@ -118,17 +128,25 @@ where } async fn handle_new_blob< - B: DaBackend, - A: NetworkAdapter, + Protocol: DaProtocol, + Backend: DaBackend, + A: NetworkAdapter, >( - backend: &mut B, + da: &mut Protocol, + backend: &mut Backend, adapter: &A, - blob: B::Blob, + blob: Protocol::Blob, ) -> Result<(), DaError> { // we need to handle the reply (verification + signature) + let attestation = da.attest(&blob); backend.add_blob(blob).await?; + // we do not call `da.recv_blob` here because that is meant to + // be called to retrieve the original data, while here we're only interested + // in storing the blob. + // We might want to refactor the backend to be part of implementations of the + // Da protocol instead of this service and clear this confusion. adapter - .send_attestation([0u8; 32]) + .send_attestation(attestation) .await .map_err(DaError::Dyn) } @@ -157,3 +175,9 @@ where } Ok(()) } + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct Settings { + pub da_protocol: P, + pub backend: B, +} diff --git a/nomos-services/data-availability/src/network/mod.rs b/nomos-services/data-availability/src/network/mod.rs index d6b783d6..360a63da 100644 --- a/nomos-services/data-availability/src/network/mod.rs +++ b/nomos-services/data-availability/src/network/mod.rs @@ -1,4 +1,4 @@ -mod adapters; +pub mod adapters; // std // crates diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 1ab4c4db..9c3c1fdd 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -14,13 +14,15 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" nomos-core = { path = "../nomos-core" } consensus-engine = { path = "../consensus-engine", features = ["serde"] } nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] } +nomos-da = { path = "../nomos-services/data-availability" } +full-replication = { path = "../nomos-da/full-replication" } mixnode = { path = "../nodes/mixnode" } mixnet-node = { path = "../mixnet/node" } mixnet-client = { path = "../mixnet/client" } mixnet-topology = { path = "../mixnet/topology" } # Using older versions, since `mixnet-*` crates depend on `rand` v0.7.3. rand = "0.7.3" -rand_xoshiro = "0.4" +rand_xoshiro = "0.6" once_cell = "1" secp256k1 = { version = "0.26", features = ["rand"] } waku-bindings = { version = "0.1.1", optional = true } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 3e75d6f1..4c7f2ed9 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -282,6 +282,16 @@ fn create_node_config( }, #[cfg(feature = "metrics")] metrics: Default::default(), + #[cfg(feature = "libp2p")] + da: nomos_da::Settings { + da_protocol: full_replication::Settings { + num_attestations: 1, + }, + backend: nomos_da::backend::memory_cache::BlobCacheSettings { + max_capacity: usize::MAX, + evicting_period: Duration::from_secs(60 * 60 * 24), // 1 day + }, + }, }; #[cfg(feature = "waku")] {