DA: Executor dispersal service (#806)

* Added basic dispersal structure

* Expand impl

* Pull events from network service

* Implement network adapter
Tie types together in service

* Fill up service processing

* Tie up types in kzgrs backend implementation

* Implement disperse

* Pipe encode and dispersal

* Create mempool adapter

* Add mempool adapter

* Tiny comment
This commit is contained in:
Daniel Sanchez 2024-10-04 15:06:42 +02:00 committed by GitHub
parent d17afcbe4d
commit 98e9cde66d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 510 additions and 9 deletions

View File

@ -20,6 +20,7 @@ members = [
"nomos-services/data-availability/network",
"nomos-services/data-availability/sampling",
"nomos-services/data-availability/verifier",
"nomos-services/data-availability/dispersal",
"nomos-services/data-availability/tests",
"nomos-da/full-replication",
"nomos-mix/message",

View File

@ -27,7 +27,3 @@ pub trait DaDispersal {
async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error>;
}
pub trait Signer {
fn sign(&self, message: &[u8]) -> Vec<u8>;
}

View File

@ -19,7 +19,7 @@ pub fn rand_data(elements_count: usize) -> Vec<u8> {
fn encode<const SIZE: usize>(bencher: Bencher, column_size: usize) {
bencher
.with_inputs(|| {
let params = DaEncoderParams::new(column_size, true);
let params = DaEncoderParams::new(column_size, true, /* ark_poly_commit::kzg10::data_structures::UniversalParams<ark_ec::models::bls12::Bls12<ark_bls12_381::curves::Config>> */);
(
DaEncoder::new(params),
rand_data(SIZE * MB / DaEncoderParams::MAX_BLS12_381_ENCODING_CHUNK_SIZE),

View File

@ -3,3 +3,5 @@ pub mod dispersal;
pub mod encoder;
pub mod global;
pub mod verifier;
pub use kzgrs::KzgRsError;

View File

@ -7,3 +7,4 @@ pub mod swarm;
pub mod test_utils;
pub type SubnetworkId = u32;
pub use libp2p::PeerId;

View File

@ -0,0 +1,19 @@
[package]
name = "nomos-da-dispersal"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
futures = "0.3"
itertools = "0.13"
nomos-core = { path = "../../../nomos-core" }
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
nomos-mempool = { path = "../../mempool", features = ["libp2p"] }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
tokio = "1.40"
thiserror = "1.0"
tracing = "0.1"

View File

@ -0,0 +1,13 @@
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
#[async_trait::async_trait]
pub trait DaMempoolAdapter {
type MempoolService: ServiceData;
type BlobId;
fn new(outbound_relay: OutboundRelay<<Self::MempoolService as ServiceData>::Message>) -> Self;
async fn post_blob_id(&self, blob_id: Self::BlobId) -> Result<(), DynError>;
}

View File

@ -0,0 +1,2 @@
pub mod mempool;
pub mod network;

View File

@ -0,0 +1,100 @@
use crate::adapters::network::DispersalNetworkAdapter;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use nomos_core::da::BlobId;
use nomos_da_network_core::protocols::dispersal::executor::behaviour::DispersalExecutorEvent;
use nomos_da_network_core::{PeerId, SubnetworkId};
use nomos_da_network_service::backends::libp2p::executor::{
DaNetworkEvent, DaNetworkEventKind, DaNetworkExecutorBackend, ExecutorDaNetworkMessage,
};
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use std::fmt::Debug;
use std::pin::Pin;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::oneshot;
pub struct Libp2pNetworkAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
outbound_relay: OutboundRelay<DaNetworkMsg<DaNetworkExecutorBackend<Membership>>>,
}
#[async_trait::async_trait]
impl<Membership> DispersalNetworkAdapter for Libp2pNetworkAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
{
type NetworkService = NetworkService<DaNetworkExecutorBackend<Membership>>;
type SubnetworkId = Membership::NetworkId;
fn new(outbound_relay: OutboundRelay<<Self::NetworkService as ServiceData>::Message>) -> Self {
Self { outbound_relay }
}
async fn disperse(
&self,
subnetwork_id: Self::SubnetworkId,
da_blob: DaBlob,
) -> Result<(), DynError> {
self.outbound_relay
.send(DaNetworkMsg::Process(
ExecutorDaNetworkMessage::RequestDispersal {
subnetwork_id,
da_blob: Box::new(da_blob),
},
))
.await
.map_err(|(e, _)| Box::new(e) as DynError)
}
async fn dispersal_events_stream(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = Result<(BlobId, Self::SubnetworkId), DynError>> + Send>>,
DynError,
> {
let (sender, receiver) = oneshot::channel();
self.outbound_relay
.send(DaNetworkMsg::Subscribe {
kind: DaNetworkEventKind::Dispersal,
sender,
})
.await
.map_err(|(e, _)| Box::new(e) as DynError)?;
receiver
.await
.map_err(|e| Box::new(e) as DynError)
.map(|stream| {
Box::pin(stream.filter_map(|event| async {
match event {
DaNetworkEvent::Sampling(_) => None,
DaNetworkEvent::Verifying(_) => None,
DaNetworkEvent::Dispersal(DispersalExecutorEvent::DispersalError {
error,
}) => Some(Err(Box::new(error) as DynError)),
DaNetworkEvent::Dispersal(DispersalExecutorEvent::DispersalSuccess {
blob_id,
subnetwork_id,
}) => Some(Ok((blob_id, subnetwork_id))),
}
}))
as BoxStream<'static, Result<(BlobId, Self::SubnetworkId), DynError>>
})
}
}

View File

@ -0,0 +1,28 @@
pub mod libp2p;
use futures::Stream;
use kzgrs_backend::common::blob::DaBlob;
use nomos_core::da::BlobId;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use std::pin::Pin;
#[async_trait::async_trait]
pub trait DispersalNetworkAdapter {
type NetworkService: ServiceData;
type SubnetworkId;
fn new(outbound_relay: OutboundRelay<<Self::NetworkService as ServiceData>::Message>) -> Self;
async fn disperse(
&self,
subnetwork_id: Self::SubnetworkId,
da_blob: DaBlob,
) -> Result<(), DynError>;
async fn dispersal_events_stream(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = Result<(BlobId, Self::SubnetworkId), DynError>> + Send>>,
DynError,
>;
}

View File

@ -0,0 +1,163 @@
use crate::adapters::mempool::DaMempoolAdapter;
use crate::adapters::network::DispersalNetworkAdapter;
use crate::backend::DispersalBackend;
use futures::StreamExt;
use itertools::izip;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::{build_blob_id, Column, ColumnIndex};
use kzgrs_backend::encoder;
use kzgrs_backend::encoder::EncodedData;
use nomos_core::da::{BlobId, DaDispersal, DaEncoder};
use overwatch_rs::DynError;
use std::sync::Arc;
use std::time::Duration;
pub struct DispersalKZGRSBackendSettings {
encoder_settings: encoder::DaEncoderParams,
dispersal_timeout: Duration,
}
pub struct DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter> {
settings: DispersalKZGRSBackendSettings,
network_adapter: Arc<NetworkAdapter>,
mempool_adapter: MempoolAdapter,
encoder: Arc<encoder::DaEncoder>,
}
pub struct DispersalFromAdapter<Adapter> {
adapter: Arc<Adapter>,
timeout: Duration,
}
// remove if solved, this occurs in the timeout method below (out of our handling)
#[allow(dependency_on_unit_never_type_fallback)]
#[async_trait::async_trait]
impl<Adapter> DaDispersal for DispersalFromAdapter<Adapter>
where
Adapter: DispersalNetworkAdapter + Send + Sync,
Adapter::SubnetworkId: From<usize> + Send + Sync,
{
type EncodedData = EncodedData;
type Error = DynError;
async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> {
let adapter = self.adapter.as_ref();
let encoded_size = encoded_data.extended_data.len();
let blob_id = build_blob_id(
&encoded_data.aggregated_column_commitment,
&encoded_data.row_commitments,
);
let reponses_stream = adapter.dispersal_events_stream().await?;
for (subnetwork_id, blob) in encoded_data_to_da_blobs(encoded_data).enumerate() {
adapter.disperse(subnetwork_id.into(), blob).await?;
}
let valid_responses = reponses_stream
.filter_map(|event| async move {
match event {
Ok((_blob_id, _)) if _blob_id == blob_id => Some(()),
_ => None,
}
})
.take(encoded_size)
.collect();
// timeout when collecting positive responses
tokio::time::timeout(self.timeout, valid_responses)
.await
.map_err(|e| Box::new(e) as DynError)?;
Ok(())
}
}
#[async_trait::async_trait]
impl<NetworkAdapter, MempoolAdapter> DispersalBackend
for DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter>
where
NetworkAdapter: DispersalNetworkAdapter + Send + Sync,
NetworkAdapter::SubnetworkId: From<usize> + Send + Sync,
MempoolAdapter: DaMempoolAdapter<BlobId = BlobId> + Send + Sync,
{
type Settings = DispersalKZGRSBackendSettings;
type Encoder = encoder::DaEncoder;
type Dispersal = DispersalFromAdapter<NetworkAdapter>;
type NetworkAdapter = NetworkAdapter;
type MempoolAdapter = MempoolAdapter;
type BlobId = BlobId;
fn init(
settings: Self::Settings,
network_adapter: Self::NetworkAdapter,
mempool_adapter: Self::MempoolAdapter,
) -> Self {
let encoder = Self::Encoder::new(settings.encoder_settings.clone());
Self {
settings,
network_adapter: Arc::new(network_adapter),
mempool_adapter,
encoder: Arc::new(encoder),
}
}
async fn encode(
&self,
data: Vec<u8>,
) -> Result<(Self::BlobId, <Self::Encoder as DaEncoder>::EncodedData), DynError> {
let encoder = Arc::clone(&self.encoder);
// this is a REALLY heavy task, so we should try not to block the thread here
let heavy_task = tokio::task::spawn_blocking(move || encoder.encode(&data));
let encoded_data = heavy_task.await??;
let blob_id = build_blob_id(
&encoded_data.aggregated_column_commitment,
&encoded_data.row_commitments,
);
Ok((blob_id, encoded_data))
}
async fn disperse(
&self,
encoded_data: <Self::Encoder as DaEncoder>::EncodedData,
) -> Result<(), DynError> {
DispersalFromAdapter {
adapter: Arc::clone(&self.network_adapter),
timeout: self.settings.dispersal_timeout,
}
.disperse(encoded_data)
.await
}
async fn publish_to_mempool(&self, blob_id: Self::BlobId) -> Result<(), DynError> {
self.mempool_adapter.post_blob_id(blob_id).await
}
}
fn encoded_data_to_da_blobs(encoded_data: EncodedData) -> impl Iterator<Item = DaBlob> {
let EncodedData {
extended_data,
row_commitments,
rows_proofs,
column_commitments,
aggregated_column_commitment,
aggregated_column_proofs,
..
} = encoded_data;
let iter = izip!(
// transpose and unwrap the types as we need to have ownership of it
extended_data.transposed().0.into_iter().map(|r| r.0),
column_commitments.into_iter(),
aggregated_column_proofs.into_iter(),
);
iter.enumerate().map(
move |(column_idx, (column, column_commitment, aggregated_column_proof))| DaBlob {
column: Column(column),
column_idx: column_idx as ColumnIndex,
column_commitment,
aggregated_column_commitment,
aggregated_column_proof,
rows_commitments: row_commitments.clone(),
rows_proofs: rows_proofs
.iter()
.map(|proofs| proofs[column_idx])
.collect(),
},
)
}

View File

@ -0,0 +1,39 @@
use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapter};
use nomos_core::da::{DaDispersal, DaEncoder};
use overwatch_rs::DynError;
pub mod kzgrs;
#[async_trait::async_trait]
pub trait DispersalBackend {
type Settings;
type Encoder: DaEncoder;
type Dispersal: DaDispersal<EncodedData = <Self::Encoder as DaEncoder>::EncodedData>;
type NetworkAdapter: DispersalNetworkAdapter;
type MempoolAdapter: DaMempoolAdapter;
type BlobId: Send;
fn init(
config: Self::Settings,
network_adapter: Self::NetworkAdapter,
mempool_adapter: Self::MempoolAdapter,
) -> Self;
async fn encode(
&self,
data: Vec<u8>,
) -> Result<(Self::BlobId, <Self::Encoder as DaEncoder>::EncodedData), DynError>;
async fn disperse(
&self,
encoded_data: <Self::Encoder as DaEncoder>::EncodedData,
) -> Result<(), DynError>;
async fn publish_to_mempool(&self, blob_id: Self::BlobId) -> Result<(), DynError>;
async fn process_dispersal(&self, data: Vec<u8>) -> Result<(), DynError> {
let (blob_id, encoded_data) = self.encode(data).await?;
self.disperse(encoded_data).await?;
self.publish_to_mempool(blob_id).await?;
Ok(())
}
}

View File

@ -0,0 +1,137 @@
// std
use std::fmt::Debug;
use std::marker::PhantomData;
// crates
use tokio::sync::oneshot;
// internal
use crate::adapters::mempool::DaMempoolAdapter;
use crate::adapters::network::DispersalNetworkAdapter;
use crate::backend::DispersalBackend;
use nomos_da_network_core::{PeerId, SubnetworkId};
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use subnetworks_assignations::MembershipHandler;
use tracing::log::error;
mod adapters;
pub mod backend;
const DA_DISPERSAL_TAG: ServiceId = "DA-Encoder";
#[derive(Debug)]
pub enum DaDispersalMsg {
Disperse {
blob: Vec<u8>,
reply_channel: oneshot::Sender<Result<(), DynError>>,
},
}
impl RelayMessage for DaDispersalMsg {}
#[derive(Clone)]
pub struct DispersalServiceSettings<BackendSettings> {
backend: BackendSettings,
}
pub struct DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter>,
Backend::Settings: Clone,
NetworkAdapter: DispersalNetworkAdapter,
MempoolAdapter: DaMempoolAdapter,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkAdapter::NetworkService>,
mempool_relay: Relay<MempoolAdapter::MempoolService>,
_backend: PhantomData<Backend>,
}
impl<Backend, NetworkAdapter, MempoolAdapter, Membership> ServiceData
for DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter>,
Backend::Settings: Clone,
NetworkAdapter: DispersalNetworkAdapter,
MempoolAdapter: DaMempoolAdapter,
{
const SERVICE_ID: ServiceId = DA_DISPERSAL_TAG;
type Settings = DispersalServiceSettings<Backend::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = DaDispersalMsg;
}
#[async_trait::async_trait]
impl<Backend, NetworkAdapter, MempoolAdapter, Membership> ServiceCore
for DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
+ Debug
+ Send
+ Sync
+ 'static,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter, MempoolAdapter = MempoolAdapter>
+ Send
+ Sync,
Backend::Settings: Clone + Send + Sync,
NetworkAdapter: DispersalNetworkAdapter<SubnetworkId = Membership::NetworkId> + Send,
MempoolAdapter: DaMempoolAdapter,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let network_relay = service_state.overwatch_handle.relay();
let mempool_relay = service_state.overwatch_handle.relay();
Ok(Self {
service_state,
network_relay,
mempool_relay,
_backend: Default::default(),
})
}
async fn run(self) -> Result<(), DynError> {
let Self {
network_relay,
mempool_relay,
service_state,
..
} = self;
let DispersalServiceSettings {
backend: backend_settings,
} = service_state.settings_reader.get_updated_settings();
let network_relay = network_relay.connect().await?;
let network_adapter = NetworkAdapter::new(network_relay);
let mempool_relay = mempool_relay.connect().await?;
let mempool_adapter = MempoolAdapter::new(mempool_relay);
let backend = Backend::init(backend_settings, network_adapter, mempool_adapter);
let mut inbound_relay = service_state.inbound_relay;
while let Some(dispersal_msg) = inbound_relay.recv().await {
match dispersal_msg {
DaDispersalMsg::Disperse {
blob,
reply_channel,
} => {
if let Err(Err(e)) = reply_channel.send(backend.process_dispersal(blob).await) {
error!("Error forwarding dispersal response: {e}");
}
}
}
}
Ok(())
}
}

View File

@ -27,7 +27,7 @@ use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream};
/// Message that the backend replies to
#[derive(Debug)]
pub enum DaNetworkMessage {
pub enum ExecutorDaNetworkMessage {
/// Kickstart a network sapling
RequestSample {
subnetwork_id: SubnetworkId,
@ -99,7 +99,7 @@ where
{
type Settings = DaNetworkExecutorBackendSettings<Membership>;
type State = NoState<Self::Settings>;
type Message = DaNetworkMessage;
type Message = ExecutorDaNetworkMessage;
type EventKind = DaNetworkEventKind;
type NetworkEvent = DaNetworkEvent;
@ -190,13 +190,13 @@ where
async fn process(&self, msg: Self::Message) {
match msg {
DaNetworkMessage::RequestSample {
ExecutorDaNetworkMessage::RequestSample {
subnetwork_id,
blob_id,
} => {
handle_sample_request(&self.sampling_request_channel, subnetwork_id, blob_id).await;
}
DaNetworkMessage::RequestDispersal {
ExecutorDaNetworkMessage::RequestDispersal {
subnetwork_id,
da_blob,
} => {