Integrate dispersal service into executor node (#812)

This commit is contained in:
gusto 2024-10-08 17:03:38 +03:00 committed by GitHub
parent 22dfb51eba
commit 07c9096924
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 59 additions and 11 deletions

View File

@ -12,6 +12,7 @@ hyper = { version = "0.14", features = ["full"] }
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
nomos-api = { path = "../../nomos-services/api" }
nomos-core = { path = "../../nomos-core" }
nomos-da-dispersal = { path = "../../nomos-services/data-availability/dispersal" }
nomos-da-network-core = { path = "../../nomos-da/network/core" }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling", features = ["rocksdb-backend"] }

View File

@ -19,6 +19,7 @@ use crate::ExecutorApiService;
pub struct Config {
pub log: <Logger as ServiceData>::Settings,
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
pub da_dispersal: <crate::DaDispersal as ServiceData>::Settings,
pub da_network:
<DaNetworkService<DaNetworkExecutorBackend<FillFromNodeList>> as ServiceData>::Settings,
pub da_indexer: <crate::DaIndexer as ServiceData>::Settings,

View File

@ -3,18 +3,23 @@ pub mod config;
// std
// crates
use rand_chacha::ChaCha20Rng;
// internal
use api::backend::AxumBackend;
use kzgrs_backend::common::blob::DaBlob;
use nomos_api::ApiService;
use nomos_da_dispersal::adapters::mempool::kzgrs::KzgrsMempoolAdapter;
use nomos_da_dispersal::adapters::network::libp2p::Libp2pNetworkAdapter as DispersalNetworkAdapter;
use nomos_da_dispersal::backend::kzgrs::DispersalKZGRSBackend;
use nomos_da_dispersal::DispersalService;
use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend;
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_mempool::backend::mockpool::MockPool;
use nomos_node::*;
use overwatch_derive::Services;
use overwatch_rs::services::handle::ServiceHandle;
use rand_chacha::ChaCha20Rng;
// internal
use api::backend::AxumBackend;
pub type ExecutorApiService = ApiService<
AxumBackend<
@ -34,11 +39,29 @@ pub type ExecutorApiService = ApiService<
>,
>;
pub type DispersalMempoolAdapter = KzgrsMempoolAdapter<
MempoolNetworkAdapter<BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
KzgrsSamplingBackend<ChaCha20Rng>,
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
ChaCha20Rng,
SamplingStorageAdapter<DaBlob, Wire>,
>;
pub type DaDispersal = DispersalService<
DispersalKZGRSBackend<DispersalNetworkAdapter<NomosDaMembership>, DispersalMempoolAdapter>,
DispersalNetworkAdapter<NomosDaMembership>,
DispersalMempoolAdapter,
NomosDaMembership,
kzgrs_backend::dispersal::Metadata,
>;
#[derive(Services)]
pub struct NomosExecutor {
#[cfg(feature = "tracing")]
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
da_dispersal: ServiceHandle<DaDispersal>,
da_indexer: ServiceHandle<DaIndexer>,
da_verifier: ServiceHandle<DaVerifier>,
da_sampling: ServiceHandle<DaSampling>,

View File

@ -86,6 +86,7 @@ fn main() -> Result<()> {
},
registry: registry.clone(),
},
da_dispersal: config.da_dispersal,
da_network: config.da_network,
da_indexer: config.da_indexer,
da_sampling: config.da_sampling,

View File

@ -15,6 +15,7 @@ nomos-mempool = { path = "../../mempool", features = ["libp2p"] }
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
tokio = "1.40"
thiserror = "1.0"

View File

@ -4,19 +4,28 @@ use std::time::Duration;
// crates
use futures::StreamExt;
use itertools::izip;
use serde::{Deserialize, Serialize};
// internal
use crate::adapters::mempool::DaMempoolAdapter;
use crate::adapters::network::DispersalNetworkAdapter;
use crate::backend::DispersalBackend;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::{build_blob_id, Column, ColumnIndex};
use kzgrs_backend::encoder::EncodedData;
use kzgrs_backend::encoder::{DaEncoderParams, EncodedData};
use kzgrs_backend::{dispersal, encoder};
use nomos_core::da::{BlobId, DaDispersal, DaEncoder};
use overwatch_rs::DynError;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EncoderSettings {
pub num_columns: usize,
pub with_cache: bool,
pub global_params_path: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DispersalKZGRSBackendSettings {
pub encoder_settings: encoder::DaEncoderParams,
pub encoder_settings: EncoderSettings,
pub dispersal_timeout: Duration,
}
pub struct DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter> {
@ -37,7 +46,7 @@ pub struct DispersalFromAdapter<Adapter> {
impl<Adapter> DaDispersal for DispersalFromAdapter<Adapter>
where
Adapter: DispersalNetworkAdapter + Send + Sync,
Adapter::SubnetworkId: From<usize> + Send + Sync,
Adapter::SubnetworkId: From<u32> + Send + Sync,
{
type EncodedData = EncodedData;
type Error = DynError;
@ -52,7 +61,9 @@ where
let reponses_stream = adapter.dispersal_events_stream().await?;
for (subnetwork_id, blob) in encoded_data_to_da_blobs(encoded_data).enumerate() {
adapter.disperse(subnetwork_id.into(), blob).await?;
adapter
.disperse((subnetwork_id as u32).into(), blob)
.await?;
}
let valid_responses = reponses_stream
@ -77,7 +88,7 @@ impl<NetworkAdapter, MempoolAdapter> DispersalBackend
for DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter>
where
NetworkAdapter: DispersalNetworkAdapter + Send + Sync,
NetworkAdapter::SubnetworkId: From<usize> + Send + Sync,
NetworkAdapter::SubnetworkId: From<u32> + Send + Sync,
MempoolAdapter: DaMempoolAdapter<BlobId = BlobId, Metadata = dispersal::Metadata> + Send + Sync,
{
type Settings = DispersalKZGRSBackendSettings;
@ -93,7 +104,16 @@ where
network_adapter: Self::NetworkAdapter,
mempool_adapter: Self::MempoolAdapter,
) -> Self {
let encoder = Self::Encoder::new(settings.encoder_settings.clone());
let encoder_settings = &settings.encoder_settings;
let global_params = kzgrs_backend::global::global_parameters_from_file(
&encoder_settings.global_params_path,
)
.expect("Global encoder params should be available");
let encoder = Self::Encoder::new(DaEncoderParams::new(
encoder_settings.num_columns,
encoder_settings.with_cache,
global_params,
));
Self {
settings,
network_adapter: Arc::new(network_adapter),

View File

@ -2,6 +2,7 @@
use std::fmt::Debug;
use std::marker::PhantomData;
// crates
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tracing::log::error;
// internal
@ -17,7 +18,7 @@ use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use subnetworks_assignations::MembershipHandler;
mod adapters;
pub mod adapters;
pub mod backend;
const DA_DISPERSAL_TAG: ServiceId = "DA-Encoder";
@ -33,7 +34,7 @@ pub enum DaDispersalMsg<Metadata> {
impl<Metadata: 'static> RelayMessage for DaDispersalMsg<Metadata> {}
#[derive(Clone)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DispersalServiceSettings<BackendSettings> {
pub backend: BackendSettings,
}