DA: Consensus sampling (#708)

* Add sampling relay to consensus and massage all generics

* Pipe in sampling filtering of blob info

* Add mark in block

* Pipe validate block

* Refactor mark_in_block -> mark_complete

* Fix generics on tests

* Fix generics on tests

* Fix rebase

* Cargo fmt after rebase

* Sampling service configuration

* Sampling service config in indexer integration tests

---------

Co-authored-by: Gusto <bacvinka@gmail.com>
This commit is contained in:
Daniel Sanchez 2024-09-03 17:02:49 +02:00 committed by GitHub
parent 6f6bb61df4
commit a13f8611ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 678 additions and 85 deletions

View File

@ -22,6 +22,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d
tracing = "0.1"
multiaddr = "0.18"
nomos-core = { path = "../../nomos-core" }
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling" }
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
@ -40,6 +41,8 @@ nomos-system-sig = { path = "../../nomos-services/system-sig" }
tracing-subscriber = "0.3"
cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" }
cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger" }
rand = "0.8"
rand_chacha = "0.3"
tokio = { version = "1.24", features = ["sync"] }
serde_json = "1.0"
serde_yaml = "0.9"

View File

@ -21,6 +21,7 @@ use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::metadata::Metadata;
use nomos_core::da::DaVerifier as CoreDaVerifier;
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_mempool::{
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
@ -29,6 +30,7 @@ use nomos_mempool::{
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tower_http::{
cors::{Any, CorsLayer},
@ -47,7 +49,19 @@ pub struct AxumBackendSettings {
pub cors_origins: Vec<String>,
}
pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
pub struct AxumBackend<
A,
B,
C,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
> {
settings: AxumBackendSettings,
_attestation: core::marker::PhantomData<A>,
_blob: core::marker::PhantomData<B>,
@ -56,6 +70,9 @@ pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
_verifier_backend: core::marker::PhantomData<VB>,
_tx: core::marker::PhantomData<T>,
_storage_serde: core::marker::PhantomData<S>,
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
_sampling_rng: core::marker::PhantomData<SamplingRng>,
}
#[derive(OpenApi)]
@ -72,7 +89,32 @@ pub struct AxumBackend<A, B, C, V, VB, T, S, const SIZE: usize> {
struct ApiDoc;
#[async_trait::async_trait]
impl<A, B, C, V, VB, T, S, const SIZE: usize> Backend for AxumBackend<A, B, C, V, VB, T, S, SIZE>
impl<
A,
B,
C,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
> Backend
for AxumBackend<
A,
B,
C,
V,
VB,
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>
where
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
@ -119,6 +161,12 @@ where
<T as nomos_core::tx::Transaction>::Hash:
Serialize + for<'de> Deserialize<'de> + std::cmp::Ord + Debug + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore + Send + 'static,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send + 'static,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
{
type Error = hyper::Error;
type Settings = AxumBackendSettings;
@ -136,6 +184,9 @@ where
_verifier_backend: core::marker::PhantomData,
_tx: core::marker::PhantomData,
_storage_serde: core::marker::PhantomData,
_sampling_backend: core::marker::PhantomData,
_sampling_network_adapter: core::marker::PhantomData,
_sampling_rng: core::marker::PhantomData,
})
}
@ -166,16 +217,45 @@ where
.route("/cl/status", routing::post(cl_status::<T>))
.route(
"/cryptarchia/info",
routing::get(cryptarchia_info::<T, S, SIZE>),
routing::get(
cryptarchia_info::<
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>,
),
)
.route(
"/cryptarchia/headers",
routing::get(cryptarchia_headers::<T, S, SIZE>),
routing::get(
cryptarchia_headers::<
T,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>,
),
)
.route("/da/add_blob", routing::post(add_blob::<A, B, VB, S>))
.route(
"/da/get_range",
routing::post(get_range::<T, C, V, S, SIZE>),
routing::post(
get_range::<
T,
C,
V,
S,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>,
),
)
.route("/network/info", routing::get(libp2p_info))
.route("/storage/block", routing::post(block::<S, T>))
@ -263,7 +343,14 @@ struct QueryParams {
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cryptarchia_info<Tx, SS, const SIZE: usize>(
async fn cryptarchia_info<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
State(handle): State<OverwatchHandle>,
) -> Response
where
@ -279,8 +366,21 @@ where
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
make_request_and_return_response!(consensus::cryptarchia_info::<Tx, SS, SIZE>(&handle))
make_request_and_return_response!(consensus::cryptarchia_info::<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>(&handle))
}
#[utoipa::path(
@ -291,7 +391,14 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn cryptarchia_headers<Tx, SS, const SIZE: usize>(
async fn cryptarchia_headers<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
State(store): State<OverwatchHandle>,
Query(query): Query<QueryParams>,
) -> Response
@ -308,11 +415,22 @@ where
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
let QueryParams { from, to } = query;
make_request_and_return_response!(consensus::cryptarchia_headers::<Tx, SS, SIZE>(
&store, from, to
))
make_request_and_return_response!(consensus::cryptarchia_headers::<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>(&store, from, to))
}
#[utoipa::path(
@ -358,7 +476,16 @@ where
(status = 500, description = "Internal server error", body = String),
)
)]
async fn get_range<Tx, C, V, SS, const SIZE: usize>(
async fn get_range<
Tx,
C,
V,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
State(handle): State<OverwatchHandle>,
Json(GetRangeReq { app_id, range }): Json<GetRangeReq<V>>,
) -> Response
@ -400,8 +527,23 @@ where
<V as Metadata>::Index:
AsRef<[u8]> + Clone + Serialize + DeserializeOwned + PartialOrd + Send + Sync,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
make_request_and_return_response!(da::get_range::<Tx, C, V, SS, SIZE>(&handle, app_id, range))
make_request_and_return_response!(da::get_range::<
Tx,
C,
V,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SIZE,
>(&handle, app_id, range))
}
#[utoipa::path(

View File

@ -123,6 +123,7 @@ pub struct Config {
<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>> as ServiceData>::Settings,
pub da_indexer: <crate::DaIndexer as ServiceData>::Settings,
pub da_verifier: <crate::DaVerifier as ServiceData>::Settings,
pub da_sampling: <crate::DaSampling as ServiceData>::Settings,
pub http: <NomosApiService as ServiceData>::Settings,
pub cryptarchia: <crate::Cryptarchia as ServiceData>::Settings,
}

View File

@ -21,6 +21,9 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorage
use nomos_da_indexer::DataIndexerService;
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter;
use nomos_da_sampling::DaSamplingService;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter as VerifierNetworkAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
@ -41,13 +44,30 @@ use nomos_storage::{
use nomos_system_sig::SystemSig;
use overwatch_derive::*;
use overwatch_rs::services::handle::ServiceHandle;
use rand_chacha::ChaCha20Rng;
use serde::{de::DeserializeOwned, Serialize};
use subnetworks_assignations::versions::v1::FillFromNodeList;
// internal
pub use tx::Tx;
pub type NomosApiService =
ApiService<AxumBackend<(), DaBlob, BlobInfo, BlobInfo, KzgrsDaVerifier, Tx, Wire, MB16>>;
/// Membership used by the DA Network service.
pub type NomosDaMembership = FillFromNodeList;
pub type NomosApiService = ApiService<
AxumBackend<
(),
DaBlob,
BlobInfo,
BlobInfo,
KzgrsDaVerifier,
Tx,
Wire,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
MB16,
>,
>;
pub const CL_TOPIC: &str = "cl";
pub const DA_TOPIC: &str = "da";
@ -62,6 +82,9 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
>;
pub type TxMempool = TxMempoolService<
@ -88,6 +111,15 @@ pub type DaIndexer = DataIndexerService<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
>;
pub type DaSampling = DaSamplingService<
KzgrsSamplingBackend<ChaCha20Rng>,
SamplingLibp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
>;
pub type DaVerifier = DaVerifierService<
@ -103,7 +135,8 @@ pub struct Nomos {
network: ServiceHandle<NetworkService<NetworkBackend>>,
da_indexer: ServiceHandle<DaIndexer>,
da_verifier: ServiceHandle<DaVerifier>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
da_sampling: ServiceHandle<DaSampling>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<NomosDaMembership>>>,
cl_mempool: ServiceHandle<TxMempool>,
da_mempool: ServiceHandle<DaMempool>,
cryptarchia: ServiceHandle<Cryptarchia>,

View File

@ -92,6 +92,7 @@ fn main() -> Result<()> {
},
da_network: config.da_network,
da_indexer: config.da_indexer,
da_sampling: config.da_sampling,
da_verifier: config.da_verifier,
cryptarchia: config.cryptarchia,
#[cfg(feature = "metrics")]

View File

@ -100,7 +100,7 @@ where
}
#[must_use]
pub fn with_blobs_certificates(
pub fn with_blobs_info(
mut self,
blobs_certificates: impl Iterator<Item = B> + 'static,
) -> Self {

View File

@ -1,7 +1,9 @@
use crate::MembershipHandler;
use libp2p_identity::PeerId;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct FillWithOriginalReplication {
pub assignations: Vec<HashSet<PeerId>>,
pub subnetwork_size: usize,
@ -63,7 +65,7 @@ impl FillWithOriginalReplication {
}
impl MembershipHandler for FillWithOriginalReplication {
type NetworkId = u16;
type NetworkId = u32;
type Id = PeerId;
fn membership(&self, id: &Self::Id) -> HashSet<Self::NetworkId> {

View File

@ -23,11 +23,13 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
] }
nomos-metrics = { path = "../../nomos-services/metrics" }
nomos-da-indexer = { path = "../data-availability/indexer", features = ["rocksdb-backend"] }
nomos-da-sampling = { path = "../data-availability/sampling" }
nomos-da-verifier = { path = "../data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
full-replication = { path = "../../nomos-da/full-replication" }
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
rand = "0.8"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.33", default-features = false, features = ["sync"] }

View File

@ -1,6 +1,7 @@
use std::{fmt::Debug, hash::Hash};
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::oneshot;
@ -15,12 +16,20 @@ use nomos_core::{
header::HeaderId,
tx::{select::FillSize as FillSizeWithTx, Transaction},
};
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
};
use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde};
pub type Cryptarchia<Tx, SS, const SIZE: usize> = CryptarchiaConsensus<
pub type Cryptarchia<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
> = CryptarchiaConsensus<
ConsensusNetworkAdapter<Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
@ -29,9 +38,19 @@ pub type Cryptarchia<Tx, SS, const SIZE: usize> = CryptarchiaConsensus<
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobs<SIZE, BlobInfo>,
RocksBackend<SS>,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>;
pub async fn cryptarchia_info<Tx, SS, const SIZE: usize>(
pub async fn cryptarchia_info<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
handle: &OverwatchHandle,
) -> Result<CryptarchiaInfo, DynError>
where
@ -47,9 +66,15 @@ where
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
let relay = handle
.relay::<Cryptarchia<Tx, SS, SIZE>>()
.relay::<Cryptarchia<Tx, SS, SamplingBackend, SamplingNetworkAdapter, SamplingRng, SIZE>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();
@ -61,7 +86,14 @@ where
Ok(receiver.await?)
}
pub async fn cryptarchia_headers<Tx, SS, const SIZE: usize>(
pub async fn cryptarchia_headers<
Tx,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
handle: &OverwatchHandle,
from: Option<HeaderId>,
to: Option<HeaderId>,
@ -79,9 +111,15 @@ where
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
let relay = handle
.relay::<Cryptarchia<Tx, SS, SIZE>>()
.relay::<Cryptarchia<Tx, SS, SamplingBackend, SamplingNetworkAdapter, SamplingRng, SIZE>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -10,6 +10,7 @@ use nomos_da_indexer::DaMsg;
use nomos_da_indexer::{
consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter, DataIndexerService,
};
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_verifier::backend::VerifierBackend;
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
@ -20,6 +21,7 @@ use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use overwatch_rs::DynError;
use rand::{RngCore, SeedableRng};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::error::Error;
@ -27,7 +29,16 @@ use std::fmt::Debug;
use std::hash::Hash;
use tokio::sync::oneshot;
pub type DaIndexer<Tx, C, V, SS, const SIZE: usize> = DataIndexerService<
pub type DaIndexer<
Tx,
C,
V,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
> = DataIndexerService<
// Indexer specific.
Bytes,
IndexerStorageAdapter<SS, V>,
@ -41,6 +52,9 @@ pub type DaIndexer<Tx, C, V, SS, const SIZE: usize> = DataIndexerService<
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobs<SIZE, V>,
RocksBackend<SS>,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>;
pub type DaVerifier<A, B, VB, SS> =
@ -73,7 +87,16 @@ where
Ok(receiver.await?)
}
pub async fn get_range<Tx, C, V, SS, const SIZE: usize>(
pub async fn get_range<
Tx,
C,
V,
SS,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
const SIZE: usize,
>(
handle: &OverwatchHandle,
app_id: <V as Metadata>::AppId,
range: Range<<V as Metadata>::Index>,
@ -116,9 +139,15 @@ where
<V as Metadata>::Index:
AsRef<[u8]> + Serialize + DeserializeOwned + Clone + PartialOrd + Send + Sync,
SS: StorageSerde + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
let relay = handle
.relay::<DaIndexer<Tx, C, V, SS, SIZE>>()
.relay::<DaIndexer<Tx, C, V, SS, SamplingBackend, SamplingNetworkAdapter, SamplingRng, SIZE>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -12,6 +12,7 @@ chrono = "0.4"
cryptarchia-engine = { path = "../../consensus/cryptarchia-engine", features = ["serde"] }
cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger", features = ["serde"] }
futures = "0.3"
nomos-da-sampling = { path = "../data-availability/sampling" }
nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" }
nomos-core = { path = "../../nomos-core" }

View File

@ -16,6 +16,8 @@ use nomos_core::{
block::{builder::BlockBuilder, Block},
header::cryptarchia::Builder,
};
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_da_sampling::{DaSamplingService, DaSamplingServiceMsg};
use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, DaMempoolService, MempoolMsg,
TxMempoolService,
@ -29,8 +31,11 @@ use overwatch_rs::services::{
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use overwatch_rs::DynError;
use rand::{RngCore, SeedableRng};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_with::serde_as;
use std::collections::BTreeSet;
use std::hash::Hash;
use thiserror::Error;
pub use time::Config as TimeConfig;
@ -41,6 +46,7 @@ use tracing::{error, instrument, span, Level};
use tracing_futures::Instrument;
type MempoolRelay<Payload, Item, Key> = OutboundRelay<MempoolMsg<HeaderId, Payload, Item, Key>>;
type SamplingRelay<BlobId> = OutboundRelay<DaSamplingServiceMsg<BlobId>>;
// Limit the number of blocks returned by GetHeaders
const HEADERS_LIMIT: usize = 512;
@ -134,8 +140,19 @@ impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> {
}
}
pub struct CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
where
pub struct CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> where
A: NetworkAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
@ -150,6 +167,12 @@ where
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobSelect<BlobId = DaPool::Item>,
Storage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
service_state: ServiceStateHandle<Self>,
// underlying networking backend. We need this so we can relay and check the types properly
@ -157,12 +180,37 @@ where
network_relay: Relay<NetworkService<A::Backend>>,
cl_mempool_relay: Relay<TxMempoolService<ClPoolAdapter, ClPool>>,
da_mempool_relay: Relay<DaMempoolService<DaPoolAdapter, DaPool>>,
sampling_relay: Relay<DaSamplingService<SamplingBackend, SamplingNetworkAdapter, SamplingRng>>,
block_subscription_sender: broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
storage_relay: Relay<StorageService<Storage>>,
}
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceData
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
impl<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> ServiceData
for CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
where
A: NetworkAdapter,
ClPool: MemPool<BlockId = HeaderId>,
@ -177,6 +225,12 @@ where
TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobSelect<BlobId = DaPool::Item>,
Storage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
const SERVICE_ID: ServiceId = CRYPTARCHIA_ID;
type Settings = CryptarchiaSettings<TxS::Settings, BS::Settings>;
@ -186,8 +240,32 @@ where
}
#[async_trait::async_trait]
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceCore
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
impl<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> ServiceCore
for CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
where
A: NetworkAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
+ Clone
@ -196,7 +274,7 @@ where
+ 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
DaPool: MemPool<BlockId = HeaderId, Key = SamplingBackend::BlobId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
@ -221,7 +299,6 @@ where
+ Sync
+ 'static,
ClPool::Key: Debug + Send + Sync,
DaPool::Key: Debug + Send + Sync,
ClPoolAdapter:
MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key> + Send + Sync + 'static,
@ -231,12 +308,20 @@ where
BS: BlobSelect<BlobId = DaPool::Item> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static,
Storage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + Send + 'static,
SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let cl_mempool_relay = service_state.overwatch_handle.relay();
let da_mempool_relay = service_state.overwatch_handle.relay();
let storage_relay = service_state.overwatch_handle.relay();
let sampling_relay = service_state.overwatch_handle.relay();
let (block_subscription_sender, _) = broadcast::channel(16);
Ok(Self {
service_state,
@ -245,6 +330,7 @@ where
da_mempool_relay,
block_subscription_sender,
storage_relay,
sampling_relay,
})
}
@ -273,6 +359,12 @@ where
.await
.expect("Relay connection with StorageService should succeed");
let sampling_relay: OutboundRelay<_> = self
.sampling_relay
.connect()
.await
.expect("Relay connection with SamplingService should succeed");
let CryptarchiaSettings {
config,
genesis_state,
@ -317,6 +409,7 @@ where
storage_relay.clone(),
cl_mempool_relay.clone(),
da_mempool_relay.clone(),
sampling_relay.clone(),
&mut self.block_subscription_sender
)
.await;
@ -341,6 +434,7 @@ where
blob_selector.clone(),
cl_mempool_relay.clone(),
da_mempool_relay.clone(),
sampling_relay.clone(),
).await;
if let Some(block) = block {
@ -367,14 +461,36 @@ where
}
}
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
impl<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
+ Clone
@ -396,15 +512,22 @@ where
+ Send
+ Sync
+ 'static,
DaPool: MemPool<BlockId = HeaderId, Key = SamplingBackend::BlobId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
BS: BlobSelect<BlobId = DaPool::Item> + Clone + Send + Sync + 'static,
ClPool::Key: Debug + Send + Sync,
DaPool::Key: Debug + Send + Sync,
ClPoolAdapter:
MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key> + Send + Sync + 'static,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
Storage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
@ -479,7 +602,14 @@ where
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
#[instrument(
level = "debug",
skip(cryptarchia, storage_relay, cl_mempool_relay, da_mempool_relay, leader)
skip(
cryptarchia,
storage_relay,
cl_mempool_relay,
da_mempool_relay,
sampling_relay,
leader
)
)]
async fn process_block(
mut cryptarchia: Cryptarchia,
@ -492,6 +622,7 @@ where
da_mempool_relay: OutboundRelay<
MempoolMsg<HeaderId, DaPoolAdapter::Payload, DaPool::Item, DaPool::Key>,
>,
sampling_relay: SamplingRelay<DaPool::Key>,
block_broadcaster: &mut broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
) -> Cryptarchia {
tracing::debug!("received proposal {:?}", block);
@ -500,6 +631,18 @@ where
let header = block.header().cryptarchia();
let id = header.id();
let sampled_blobs = match get_sampled_blobs(sampling_relay.clone()).await {
Ok(sampled_blobs) => sampled_blobs,
Err(error) => {
error!("Unable to retrieved sampled blobs: {error}");
return cryptarchia;
}
};
if !Self::validate_block(&block, &sampled_blobs) {
error!("Invalid block: {block:?}");
return cryptarchia;
}
match cryptarchia.try_apply_header(header) {
Ok(new_state) => {
// update leader
@ -512,7 +655,6 @@ where
id,
)
.await;
mark_in_block(
da_mempool_relay,
block.blobs().map(DispersedBlobInfo::blob_id),
@ -520,6 +662,12 @@ where
)
.await;
mark_blob_in_block(
sampling_relay,
block.blobs().map(DispersedBlobInfo::blob_id).collect(),
)
.await;
// store block
let msg = <StorageMsg<_>>::new_store_message(header.id(), block.clone());
if let Err((e, _msg)) = storage_relay.send(msg).await {
@ -544,7 +692,13 @@ where
#[instrument(
level = "debug",
skip(cl_mempool_relay, da_mempool_relay, tx_selector, blob_selector)
skip(
cl_mempool_relay,
da_mempool_relay,
sampling_relay,
tx_selector,
blob_selector
)
)]
async fn propose_block(
parent: HeaderId,
@ -553,17 +707,20 @@ where
blob_selector: BS,
cl_mempool_relay: MempoolRelay<ClPool::Item, ClPool::Item, ClPool::Key>,
da_mempool_relay: MempoolRelay<DaPoolAdapter::Payload, DaPool::Item, DaPool::Key>,
sampling_relay: SamplingRelay<SamplingBackend::BlobId>,
) -> Option<Block<ClPool::Item, DaPool::Item>> {
let mut output = None;
let cl_txs = get_mempool_contents(cl_mempool_relay);
let da_certs = get_mempool_contents(da_mempool_relay);
match futures::join!(cl_txs, da_certs) {
(Ok(cl_txs), Ok(da_certs)) => {
let blobs_ids = get_sampled_blobs(sampling_relay);
match futures::join!(cl_txs, da_certs, blobs_ids) {
(Ok(cl_txs), Ok(da_blobs_info), Ok(blobs_ids)) => {
let Ok(block) = BlockBuilder::new(tx_selector, blob_selector)
.with_cryptarchia_builder(Builder::new(parent, proof))
.with_transactions(cl_txs)
.with_blobs_certificates(da_certs)
.with_blobs_info(
da_blobs_info.filter(move |info| blobs_ids.contains(&info.blob_id())),
)
.build()
else {
panic!("Proposal block should always succeed to be built")
@ -571,11 +728,30 @@ where
tracing::debug!("proposed block with id {:?}", block.header().id());
output = Some(block);
}
(Err(_), _) => tracing::error!("Could not fetch block cl transactions"),
(_, Err(_)) => tracing::error!("Could not fetch block da certificates"),
(tx_error, da_certificate_error, blobs_error) => {
if let Err(_tx_error) = tx_error {
tracing::error!("Could not fetch block cl transactions");
}
if let Err(_da_certificate_error) = da_certificate_error {
tracing::error!("Could not fetch block da certificates");
}
if let Err(_blobs_error) = blobs_error {
tracing::error!("Could not fetch block da blobs");
}
}
}
output
}
fn validate_block(
block: &Block<ClPool::Item, DaPool::Item>,
sampled_blobs_ids: &BTreeSet<DaPool::Key>,
) -> bool {
let validated_blobs = block
.blobs()
.all(|blob| sampled_blobs_ids.contains(&blob.blob_id()));
validated_blobs
}
}
#[derive(Debug)]
@ -633,3 +809,28 @@ async fn mark_in_block<Payload, Item, Key>(
.await
.unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}"))
}
async fn mark_blob_in_block<BlobId: Debug>(
sampling_relay: SamplingRelay<BlobId>,
blobs_id: Vec<BlobId>,
) {
if let Err((_e, DaSamplingServiceMsg::MarkInBlock { blobs_id })) = sampling_relay
.send(DaSamplingServiceMsg::MarkInBlock { blobs_id })
.await
{
error!("Error marking in block for blobs ids: {blobs_id:?}");
}
}
async fn get_sampled_blobs<BlobId>(
sampling_relay: SamplingRelay<BlobId>,
) -> Result<BTreeSet<BlobId>, DynError> {
let (sender, receiver) = oneshot::channel();
sampling_relay
.send(DaSamplingServiceMsg::GetValidatedBlobs {
reply_channel: sender,
})
.await
.map_err(|(error, _)| Box::new(error) as DynError)?;
receiver.await.map_err(|error| Box::new(error) as DynError)
}

View File

@ -11,14 +11,16 @@ bytes = "1.2"
futures = "0.3"
nomos-core = { path = "../../../nomos-core" }
nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-da-sampling = { path = "../sampling" }
nomos-storage = { path = "../../../nomos-services/storage" }
nomos-mempool = { path = "../../../nomos-services/mempool" }
cryptarchia-consensus = { path = "../../../nomos-services/cryptarchia-consensus" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tokio = { version = "1", features = ["sync"] }
serde = { version = "1.0", features = ["derive"] }
rand = "0.8"
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1.15"
[features]

View File

@ -13,6 +13,7 @@ use nomos_core::block::Block;
use nomos_core::da::blob::{info::DispersedBlobInfo, metadata::Metadata, BlobSelect};
use nomos_core::header::HeaderId;
use nomos_core::tx::{Transaction, TxSelect};
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_mempool::{backend::MemPool, network::NetworkAdapter as MempoolAdapter};
use nomos_storage::backends::StorageBackend;
use nomos_storage::StorageService;
@ -22,14 +23,40 @@ use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use rand::{RngCore, SeedableRng};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use storage::DaStorageAdapter;
use tokio::sync::oneshot::Sender;
use tracing::error;
pub type ConsensusRelay<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> =
Relay<CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>>;
pub type ConsensusRelay<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> = Relay<
CryptarchiaConsensus<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
Storage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>,
>;
pub struct DataIndexerService<
B,
@ -43,6 +70,9 @@ pub struct DataIndexerService<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> where
B: 'static,
A: NetworkAdapter,
@ -60,11 +90,28 @@ pub struct DataIndexerService<
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
service_state: ServiceStateHandle<Self>,
storage_relay: Relay<StorageService<DaStorage::Backend>>,
consensus_relay:
ConsensusRelay<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, ConsensusStorage>,
consensus_relay: ConsensusRelay<
A,
ClPool,
ClPoolAdapter,
DaPool,
DaPoolAdapter,
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>,
}
pub enum DaMsg<B, V: Metadata> {
@ -105,6 +152,9 @@ impl<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> ServiceData
for DataIndexerService<
B,
@ -118,6 +168,9 @@ impl<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
where
B: 'static,
@ -136,6 +189,12 @@ where
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
const SERVICE_ID: ServiceId = "DaIndexer";
type Settings = IndexerSettings<DaStorage::Settings>;
@ -156,6 +215,9 @@ impl<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
DataIndexerService<
B,
@ -169,6 +231,9 @@ impl<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
where
B: Send + Sync + 'static,
@ -188,6 +253,12 @@ where
BS: BlobSelect<BlobId = DaPool::Item>,
DaStorage: DaStorageAdapter<Info = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
async fn handle_new_block(
storage_adapter: &DaStorage,
@ -249,6 +320,9 @@ impl<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
> ServiceCore
for DataIndexerService<
B,
@ -262,6 +336,9 @@ impl<
TxS,
BS,
ConsensusStorage,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
>
where
B: Debug + Send + Sync,
@ -304,6 +381,12 @@ where
DaStorage::Settings: Clone + Send + Sync + 'static,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
Consensus: ConsensusAdapter<Tx = ClPool::Item, Cert = DaPool::Item> + Send + Sync,
SamplingRng: SeedableRng + RngCore,
SamplingBackend: DaSamplingServiceBackend<SamplingRng> + Send,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let consensus_relay = service_state.overwatch_handle.relay();

View File

@ -7,6 +7,7 @@ use std::time::{Duration, Instant};
use hex;
use rand::distributions::Standard;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time;
use tokio::time::Interval;
@ -24,21 +25,21 @@ pub struct SamplingContext {
started: Instant,
}
#[derive(Debug, Clone)]
pub struct KzgrsDaSamplerSettings {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KzgrsSamplingBackendSettings {
pub num_samples: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
}
pub struct KzgrsDaSampler<R: Rng> {
settings: KzgrsDaSamplerSettings,
pub struct KzgrsSamplingBackend<R: Rng> {
settings: KzgrsSamplingBackendSettings,
validated_blobs: BTreeSet<BlobId>,
pending_sampling_blobs: HashMap<BlobId, SamplingContext>,
rng: R,
}
impl<R: Rng> KzgrsDaSampler<R> {
impl<R: Rng> KzgrsSamplingBackend<R> {
fn prune_by_time(&mut self) {
self.pending_sampling_blobs.retain(|_blob_id, context| {
context.started.elapsed() < self.settings.blobs_validity_duration
@ -47,8 +48,8 @@ impl<R: Rng> KzgrsDaSampler<R> {
}
#[async_trait::async_trait]
impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsDaSampler<R> {
type Settings = KzgrsDaSamplerSettings;
impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsSamplingBackend<R> {
type Settings = KzgrsSamplingBackendSettings;
type BlobId = BlobId;
type Blob = DaBlob;
@ -62,7 +63,7 @@ impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsDaSampler<R> {
}
}
async fn prune_interval(&self) -> Interval {
fn prune_interval(&self) -> Interval {
time::interval(self.settings.old_blobs_check_interval)
}
@ -70,7 +71,7 @@ impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsDaSampler<R> {
self.validated_blobs.clone()
}
async fn mark_in_block(&mut self, blobs_ids: &[Self::BlobId]) {
async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]) {
for id in blobs_ids {
self.pending_sampling_blobs.remove(id);
self.validated_blobs.remove(id);
@ -142,20 +143,20 @@ mod test {
use rand::rngs::StdRng;
use crate::backend::kzgrs::{
DaSamplingServiceBackend, KzgrsDaSampler, KzgrsDaSamplerSettings, SamplingContext,
SamplingState,
DaSamplingServiceBackend, KzgrsSamplingBackend, KzgrsSamplingBackendSettings,
SamplingContext, SamplingState,
};
use kzgrs_backend::common::{blob::DaBlob, Column};
use nomos_core::da::BlobId;
fn create_sampler(subnet_num: usize) -> KzgrsDaSampler<StdRng> {
let settings = KzgrsDaSamplerSettings {
fn create_sampler(subnet_num: usize) -> KzgrsSamplingBackend<StdRng> {
let settings = KzgrsSamplingBackendSettings {
num_samples: subnet_num as u16,
old_blobs_check_interval: Duration::from_millis(20),
blobs_validity_duration: Duration::from_millis(10),
};
let rng = StdRng::from_entropy();
KzgrsDaSampler::new(settings, rng)
KzgrsSamplingBackend::new(settings, rng)
}
#[tokio::test]
@ -204,7 +205,7 @@ mod test {
// mark in block for both
// collections should be reset
sampler.mark_in_block(&[b1, b2]).await;
sampler.mark_completed(&[b1, b2]).await;
assert!(sampler.pending_sampling_blobs.is_empty());
assert!(sampler.validated_blobs.is_empty());
@ -301,7 +302,7 @@ mod test {
// run mark_in_block for the same blob
// should return empty for everything
sampler.mark_in_block(&[b1]).await;
sampler.mark_completed(&[b1]).await;
assert!(sampler.validated_blobs.is_empty());
assert!(sampler.pending_sampling_blobs.is_empty());
}

View File

@ -2,11 +2,9 @@ pub mod kzgrs;
// std
use std::collections::BTreeSet;
// crates
use rand::Rng;
use tokio::time::Interval;
//
// internal
use nomos_da_network_core::SubnetworkId;
@ -24,10 +22,10 @@ pub trait DaSamplingServiceBackend<R: Rng> {
fn new(settings: Self::Settings, rng: R) -> Self;
async fn get_validated_blobs(&self) -> BTreeSet<Self::BlobId>;
async fn mark_in_block(&mut self, blobs_ids: &[Self::BlobId]);
async fn mark_completed(&mut self, blobs_ids: &[Self::BlobId]);
async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob);
async fn handle_sampling_error(&mut self, blob_id: Self::BlobId);
async fn init_sampling(&mut self, blob_id: Self::BlobId) -> SamplingState;
async fn prune_interval(&self) -> Interval;
fn prune_interval(&self) -> Interval;
fn prune(&mut self);
}

View File

@ -22,6 +22,7 @@ use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
const DA_SAMPLING_TAG: ServiceId = "DA-Sampling";
@ -39,7 +40,7 @@ pub enum DaSamplingServiceMsg<BlobId> {
},
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings> {
pub sampling_settings: BackendSettings,
pub network_adapter_settings: NetworkSettings,
@ -47,7 +48,7 @@ pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings> {
impl<B: 'static> RelayMessage for DaSamplingServiceMsg<B> {}
pub struct DaSamplingService<Backend, N, S, R>
pub struct DaSamplingService<Backend, N, R>
where
R: SeedableRng + RngCore,
Backend: DaSamplingServiceBackend<R> + Send,
@ -62,7 +63,7 @@ where
sampler: Backend,
}
impl<Backend, N, S, R> DaSamplingService<Backend, N, S, R>
impl<Backend, N, R> DaSamplingService<Backend, N, R>
where
R: SeedableRng + RngCore,
Backend: DaSamplingServiceBackend<R, BlobId = BlobId, Blob = DaBlob> + Send + 'static,
@ -111,7 +112,7 @@ where
}
}
DaSamplingServiceMsg::MarkInBlock { blobs_id } => {
sampler.mark_in_block(&blobs_id).await;
sampler.mark_completed(&blobs_id).await;
}
}
}
@ -132,7 +133,7 @@ where
}
}
impl<Backend, N, S, R> ServiceData for DaSamplingService<Backend, N, S, R>
impl<Backend, N, R> ServiceData for DaSamplingService<Backend, N, R>
where
R: SeedableRng + RngCore,
Backend: DaSamplingServiceBackend<R> + Send,
@ -150,7 +151,7 @@ where
}
#[async_trait::async_trait]
impl<Backend, N, S, R> ServiceCore for DaSamplingService<Backend, N, S, R>
impl<Backend, N, R> ServiceCore for DaSamplingService<Backend, N, R>
where
R: SeedableRng + RngCore,
Backend: DaSamplingServiceBackend<R, BlobId = BlobId, Blob = DaBlob> + Send + Sync + 'static,
@ -189,7 +190,7 @@ where
let mut network_adapter = N::new(network_relay).await;
let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?;
let mut next_prune_tick = sampler.prune_interval().await;
let mut next_prune_tick = sampler.prune_interval();
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
async {

View File

@ -17,9 +17,10 @@ use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaNetworkSamplingSettings {
pub num_samples: u16,
pub subnet_size: SubnetworkId,

View File

@ -15,6 +15,8 @@ kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
nomos-core = { path = "../../../nomos-core" }
nomos-da-indexer = { path = "../indexer", features = ["rocksdb-backend"] }
nomos-da-verifier = { path = "../verifier", features = ["rocksdb-backend", "libp2p"] }
nomos-da-sampling = { path = "../sampling" }
nomos-da-network-service = { path = "../network" }
nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-node = { path = "../../../nodes/nomos-node" }
nomos-mempool = { path = "../../../nomos-services/mempool" }
@ -24,12 +26,14 @@ nomos-network = { path = "../../network", features = ["mock"] }
nomos-libp2p = { path = "../../../nomos-libp2p" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
rand = "0.8"
rand_chacha = "0.3"
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1.15"
tempfile = "3.6"
tracing = "0.1"
time = "0.3"
rand = "0.8"
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
[dev-dependencies]
blake2 = { version = "0.10" }

View File

@ -5,6 +5,10 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transa
use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter;
use nomos_da_indexer::DataIndexerService;
use nomos_da_sampling::{
backend::kzgrs::KzgrsSamplingBackend,
network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter,
};
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter;
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
@ -13,6 +17,8 @@ use nomos_libp2p::{Multiaddr, Swarm, SwarmConfig};
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
use nomos_storage::backends::rocksdb::RocksBackend;
use rand_chacha::ChaCha20Rng;
use subnetworks_assignations::versions::v2::FillWithOriginalReplication;
pub use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
@ -29,6 +35,9 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
SamplingLibp2pAdapter<FillWithOriginalReplication>,
ChaCha20Rng,
>;
pub(crate) type DaIndexer = DataIndexerService<
@ -45,6 +54,9 @@ pub(crate) type DaIndexer = DataIndexerService<
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobs<MB16, BlobInfo>,
RocksBackend<Wire>,
KzgrsSamplingBackend<ChaCha20Rng>,
SamplingLibp2pAdapter<FillWithOriginalReplication>,
ChaCha20Rng,
>;
pub(crate) type TxMempool = TxMempoolService<

View File

@ -16,8 +16,14 @@ use nomos_core::da::blob::metadata::Metadata as _;
use nomos_core::tx::Transaction;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_network_service::backends::libp2p::validator::{
DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings,
};
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_da_storage::fs::write_blob;
use nomos_da_storage::rocksdb::DA_VERIFIED_KEY_PREFIX;
use nomos_libp2p::{ed25519, identity, PeerId};
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
@ -29,6 +35,7 @@ use overwatch_derive::*;
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
use overwatch_rs::services::handle::ServiceHandle;
use rand::{thread_rng, Rng};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tempfile::{NamedTempFile, TempDir};
use time::OffsetDateTime;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
@ -39,6 +46,7 @@ use crate::common::*;
struct IndexerNode {
network: ServiceHandle<NetworkService<NetworkBackend>>,
cl_mempool: ServiceHandle<TxMempool>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
da_mempool: ServiceHandle<DaMempool>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
cryptarchia: ServiceHandle<Cryptarchia>,
@ -63,6 +71,18 @@ fn new_node(
initial_peers,
},
},
da_network: DaNetworkConfig {
backend: DaNetworkValidatorBackendSettings {
node_key: ed25519::SecretKey::generate(),
membership: FillFromNodeList::new(
&[PeerId::from(identity::Keypair::generate_ed25519().public())],
2,
1,
),
addresses: Default::default(),
listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::<Multiaddr>().unwrap(),
},
},
cl_mempool: TxMempoolSettings {
backend: (),
network: AdapterSettings {
@ -107,6 +127,7 @@ fn new_node(
// TODO: When verifier is implemented this test should be removed and a new one
// performed in integration tests crate using the real node.
#[ignore = "Sampling needs to be started in mempool"]
#[test]
fn test_indexer() {
let performed_tx = Arc::new(AtomicBool::new(false));

View File

@ -20,6 +20,7 @@ nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2
nomos-da-network-service = { path = "../nomos-services/data-availability/network" }
nomos-da-indexer = { path = "../nomos-services/data-availability/indexer" }
nomos-da-verifier = { path = "../nomos-services/data-availability/verifier" }
nomos-da-sampling = { path = "../nomos-services/data-availability/sampling" }
subnetworks-assignations = { path = "../nomos-da/network/subnetworks-assignations" }
full-replication = { path = "../nomos-da/full-replication" }
hex = "0.4.3"

View File

@ -33,6 +33,9 @@ use nomos_network::backends::libp2p::mixnet::MixnetConfig;
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::{api::AxumBackendSettings, Config, Tx};
// crates
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings;
use nomos_da_sampling::DaSamplingServiceSettings;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng, RngCore};
use reqwest::{Client, Url};
@ -419,6 +422,19 @@ fn create_node_config(
cors_origins: vec![],
},
},
da_sampling: DaSamplingServiceSettings {
// TODO: setup this properly!
sampling_settings: KzgrsSamplingBackendSettings {
num_samples: 0,
// Sampling service period can't be zero.
old_blobs_check_interval: Duration::from_secs(1),
blobs_validity_duration: Duration::from_secs(1),
},
network_adapter_settings: DaNetworkSamplingSettings {
num_samples: 0,
subnet_size: 0,
},
},
};
config.network.backend.inner.port = get_available_port();