Add da service to nomos node (#404)

* Make  the data availability service work with multiple protocols

* Add a generic way to instantiate DaProtocol

Add settings type and a new `new(Self::Settings)` method to
build a new DaProtocol instance

* Add data availability service to node

* fix tests

* fix imports
This commit is contained in:
Giacomo Pasini 2023-09-18 11:43:24 +02:00 committed by GitHub
parent 633b5d6cbf
commit 5e194922c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 113 additions and 42 deletions

View File

@ -25,6 +25,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
nomos-consensus = { path = "../../nomos-services/consensus" }
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
nomos-da = { path = "../../nomos-services/data-availability" }
metrics = { path = "../../nomos-services/metrics", optional = true }
tracing-subscriber = "0.3"
consensus-engine = { path = "../../consensus-engine" }
@ -34,7 +35,7 @@ serde_yaml = "0.9"
color-eyre = "0.6.0"
serde = "1"
waku-bindings = { version = "0.1.1", optional = true }
full-replication = { path = "../../nomos-da/full-replication" }
[features]
default = ["libp2p"]

View File

@ -1 +0,0 @@
pub type Blob = Box<[u8]>;

View File

@ -5,6 +5,8 @@ use std::{
};
use crate::Carnot;
#[cfg(feature = "libp2p")]
use crate::DataAvailability;
use clap::{Parser, ValueEnum};
use color_eyre::eyre::{self, eyre, Result};
use hex::FromHex;
@ -129,6 +131,8 @@ pub struct Config {
pub consensus: <Carnot as ServiceData>::Settings,
#[cfg(feature = "metrics")]
pub metrics: <MetricsService<MapMetricsBackend<MetricsData>> as ServiceData>::Settings,
#[cfg(feature = "libp2p")]
pub da: <DataAvailability as ServiceData>::Settings,
}
impl Config {

View File

@ -1,9 +1,11 @@
mod blob;
mod config;
mod tx;
use color_eyre::eyre::Result;
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
use full_replication::Blob;
#[cfg(feature = "libp2p")]
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
#[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
#[cfg(feature = "libp2p")]
@ -11,6 +13,11 @@ use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2p
#[cfg(feature = "waku")]
use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter;
use nomos_consensus::CarnotConsensus;
#[cfg(feature = "libp2p")]
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
DataAvailabilityService,
};
use nomos_http::backends::axum::AxumBackend;
use nomos_http::bridge::HttpBridgeService;
use nomos_http::http::HttpService;
@ -28,7 +35,6 @@ use nomos_network::NetworkService;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
use crate::blob::Blob;
pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs};
pub use tx::Tx;
@ -53,6 +59,13 @@ pub type Carnot = CarnotConsensus<
Blob,
>;
#[cfg(feature = "libp2p")]
type DataAvailability = DataAvailabilityService<
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
DaLibp2pAdapter<Blob, Attestation>,
>;
#[derive(Services)]
pub struct Nomos {
logging: ServiceHandle<Logger>,
@ -69,4 +82,6 @@ pub struct Nomos {
bridges: ServiceHandle<HttpBridgeService>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
#[cfg(feature = "libp2p")]
da: ServiceHandle<DataAvailability>,
}

View File

@ -83,6 +83,8 @@ fn main() -> Result<()> {
bridges: HttpBridgeSettings { bridges },
#[cfg(feature = "metrics")]
metrics: config.metrics,
#[cfg(feature = "libp2p")]
da: config.da,
},
None,
)

View File

@ -13,7 +13,10 @@ pub trait DaProtocol {
type Blob: Blob;
type Attestation: Attestation;
type Certificate: Certificate;
type Settings: Clone;
// Construct a new instance
fn new(settings: Self::Settings) -> Self;
/// Encode bytes into blobs
fn encode<T: AsRef<[u8]>>(&self, data: T) -> Vec<Self::Blob>;
/// Feed a blob for decoding.

View File

@ -59,6 +59,11 @@ impl<A, C> AbsoluteNumber<A, C> {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Settings {
pub num_attestations: usize,
}
impl CertificateStrategy for AbsoluteNumber<Attestation, Certificate> {
type Attestation = Attestation;
type Certificate = Certificate;
@ -79,7 +84,7 @@ impl CertificateStrategy for AbsoluteNumber<Attestation, Certificate> {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, Hash, PartialEq)]
pub struct Blob {
data: Bytes,
@ -130,6 +135,11 @@ impl DaProtocol for FullReplication<AbsoluteNumber<Attestation, Certificate>> {
type Blob = Blob;
type Attestation = Attestation;
type Certificate = Certificate;
type Settings = Settings;
fn new(settings: Self::Settings) -> Self {
Self::new(AbsoluteNumber::new(settings.num_attestations))
}
fn encode<T: AsRef<[u8]>>(&self, data: T) -> Vec<Self::Blob> {
vec![Blob {

View File

@ -1,12 +1,13 @@
use crate::backend::{DaBackend, DaError};
use moka::future::{Cache, CacheBuilder};
use nomos_core::da::blob::Blob;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BlobCacheSettings {
max_capacity: usize,
evicting_period: Duration,
pub max_capacity: usize,
pub evicting_period: Duration,
}
pub struct BlobCache<H, B>(Cache<H, B>);

View File

@ -1,4 +1,4 @@
mod memory_cache;
pub mod memory_cache;
use nomos_core::da::blob::Blob;
use overwatch_rs::DynError;

View File

@ -1,31 +1,34 @@
mod backend;
mod network;
pub mod backend;
pub mod network;
// std
use overwatch_rs::DynError;
use std::fmt::{Debug, Formatter};
// crates
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot::Sender;
// internal
use crate::backend::{DaBackend, DaError};
use crate::network::NetworkAdapter;
use nomos_core::da::blob::Blob;
use nomos_core::da::{blob::Blob, DaProtocol};
use nomos_network::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
pub struct DataAvailabilityService<B, N>
pub struct DataAvailabilityService<Protocol, Backend, Network>
where
B: DaBackend,
B::Blob: 'static,
N: NetworkAdapter<Blob = B::Blob>,
Protocol: DaProtocol,
Backend: DaBackend<Blob = Protocol::Blob>,
Backend::Blob: 'static,
Network: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
{
service_state: ServiceStateHandle<Self>,
backend: B,
network_relay: Relay<NetworkService<N::Backend>>,
backend: Backend,
da: Protocol,
network_relay: Relay<NetworkService<Network::Backend>>,
}
pub enum DaMsg<B: Blob> {
@ -52,36 +55,42 @@ impl<B: Blob + 'static> Debug for DaMsg<B> {
impl<B: Blob + 'static> RelayMessage for DaMsg<B> {}
impl<B, N> ServiceData for DataAvailabilityService<B, N>
impl<Protocol, Backend, Network> ServiceData for DataAvailabilityService<Protocol, Backend, Network>
where
B: DaBackend,
B::Blob: 'static,
N: NetworkAdapter<Blob = B::Blob>,
Protocol: DaProtocol,
Backend: DaBackend<Blob = Protocol::Blob>,
Backend::Blob: 'static,
Network: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
{
const SERVICE_ID: ServiceId = "DA";
type Settings = B::Settings;
type Settings = Settings<Protocol::Settings, Backend::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = DaMsg<B::Blob>;
type Message = DaMsg<Protocol::Blob>;
}
#[async_trait::async_trait]
impl<B, N> ServiceCore for DataAvailabilityService<B, N>
impl<Protocol, Backend, Network> ServiceCore for DataAvailabilityService<Protocol, Backend, Network>
where
B: DaBackend + Send + Sync,
B::Settings: Clone + Send + Sync + 'static,
B::Blob: Send,
<B::Blob as Blob>::Hash: Debug + Send + Sync,
// TODO: Reply type must be piped together, for now empty array.
N: NetworkAdapter<Blob = B::Blob, Attestation = [u8; 32]> + Send + Sync,
Protocol: DaProtocol + Send + Sync,
Backend: DaBackend<Blob = Protocol::Blob> + Send + Sync,
Protocol::Settings: Clone + Send + Sync + 'static,
Backend::Settings: Clone + Send + Sync + 'static,
Protocol::Blob: Send,
Protocol::Attestation: Send,
<Backend::Blob as Blob>::Hash: Debug + Send + Sync,
Network:
NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation> + Send + Sync,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let network_relay = service_state.overwatch_handle.relay();
let backend_settings = service_state.settings_reader.get_updated_settings();
let backend = B::new(backend_settings);
let settings = service_state.settings_reader.get_updated_settings();
let backend = Backend::new(settings.backend);
let da = Protocol::new(settings.da_protocol);
Ok(Self {
service_state,
backend,
da,
network_relay,
})
}
@ -90,6 +99,7 @@ where
let Self {
mut service_state,
mut backend,
mut da,
network_relay,
} = self;
@ -98,12 +108,12 @@ where
.await
.expect("Relay connection with NetworkService should succeed");
let adapter = N::new(network_relay).await;
let adapter = Network::new(network_relay).await;
let mut network_blobs = adapter.blob_stream().await;
loop {
tokio::select! {
Some(blob) = network_blobs.next() => {
if let Err(e) = handle_new_blob(&mut backend, &adapter, blob).await {
if let Err(e) = handle_new_blob(&mut da, &mut backend, &adapter, blob).await {
tracing::debug!("Failed to add a new received blob: {e:?}");
}
}
@ -118,17 +128,25 @@ where
}
async fn handle_new_blob<
B: DaBackend,
A: NetworkAdapter<Blob = B::Blob, Attestation = [u8; 32]>,
Protocol: DaProtocol,
Backend: DaBackend<Blob = Protocol::Blob>,
A: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
>(
backend: &mut B,
da: &mut Protocol,
backend: &mut Backend,
adapter: &A,
blob: B::Blob,
blob: Protocol::Blob,
) -> Result<(), DaError> {
// we need to handle the reply (verification + signature)
let attestation = da.attest(&blob);
backend.add_blob(blob).await?;
// we do not call `da.recv_blob` here because that is meant to
// be called to retrieve the original data, while here we're only interested
// in storing the blob.
// We might want to refactor the backend to be part of implementations of the
// Da protocol instead of this service and clear this confusion.
adapter
.send_attestation([0u8; 32])
.send_attestation(attestation)
.await
.map_err(DaError::Dyn)
}
@ -157,3 +175,9 @@ where
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Settings<P, B> {
pub da_protocol: P,
pub backend: B,
}

View File

@ -1,4 +1,4 @@
mod adapters;
pub mod adapters;
// std
// crates

View File

@ -14,13 +14,15 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main"
nomos-core = { path = "../nomos-core" }
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] }
nomos-da = { path = "../nomos-services/data-availability" }
full-replication = { path = "../nomos-da/full-replication" }
mixnode = { path = "../nodes/mixnode" }
mixnet-node = { path = "../mixnet/node" }
mixnet-client = { path = "../mixnet/client" }
mixnet-topology = { path = "../mixnet/topology" }
# Using older versions, since `mixnet-*` crates depend on `rand` v0.7.3.
rand = "0.7.3"
rand_xoshiro = "0.4"
rand_xoshiro = "0.6"
once_cell = "1"
secp256k1 = { version = "0.26", features = ["rand"] }
waku-bindings = { version = "0.1.1", optional = true }

View File

@ -282,6 +282,16 @@ fn create_node_config(
},
#[cfg(feature = "metrics")]
metrics: Default::default(),
#[cfg(feature = "libp2p")]
da: nomos_da::Settings {
da_protocol: full_replication::Settings {
num_attestations: 1,
},
backend: nomos_da::backend::memory_cache::BlobCacheSettings {
max_capacity: usize::MAX,
evicting_period: Duration::from_secs(60 * 60 * 24), // 1 day
},
},
};
#[cfg(feature = "waku")]
{