Executor: Dispersal service mempool adapter (#810)

* Pass metadata when dispersing

* Sketch of executor dispersal mempool adapter
This commit is contained in:
gusto 2024-10-08 11:03:34 +03:00 committed by GitHub
parent a3cea9a09c
commit 22dfb51eba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 201 additions and 36 deletions

View File

@ -1,24 +1,27 @@
// std
use std::hash::{Hash, Hasher};
// crates
use nomos_core::da::blob::{self, metadata::Next};
use nomos_core::da::{
blob::{self, metadata::Next},
BlobId,
};
use serde::{Deserialize, Serialize};
// internal
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct BlobInfo {
id: [u8; 32],
id: BlobId,
metadata: Metadata,
}
impl BlobInfo {
pub fn new(id: [u8; 32], metadata: Metadata) -> Self {
pub fn new(id: BlobId, metadata: Metadata) -> Self {
Self { id, metadata }
}
}
impl blob::info::DispersedBlobInfo for BlobInfo {
type BlobId = [u8; 32];
type BlobId = BlobId;
fn blob_id(&self) -> Self::BlobId {
self.id
@ -63,6 +66,15 @@ impl Metadata {
}
}
impl blob::metadata::Metadata for Metadata {
type AppId = [u8; 32];
type Index = Index;
fn metadata(&self) -> (Self::AppId, Self::Index) {
(self.app_id, self.index)
}
}
impl From<u64> for Index {
fn from(value: u64) -> Self {
Self(value.to_be_bytes())

View File

@ -9,11 +9,13 @@ futures = "0.3"
itertools = "0.13"
nomos-core = { path = "../../../nomos-core" }
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
nomos-mempool = { path = "../../mempool", features = ["libp2p"] }
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
nomos-da-sampling = { path = "../sampling" }
nomos-mempool = { path = "../../mempool", features = ["libp2p"] }
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
rand = "0.8"
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
tokio = "1.40"
thiserror = "1.0"
tracing = "0.1"

View File

@ -0,0 +1,116 @@
// std
use std::hash::Hash;
use std::{fmt::Debug, marker::PhantomData};
// crates
use rand::{RngCore, SeedableRng};
use tokio::sync::oneshot;
// internal
use super::DaMempoolAdapter;
use kzgrs_backend::dispersal::{self, BlobInfo};
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::BlobId;
use nomos_core::header::HeaderId;
use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, DaMempoolService, MempoolMsg,
};
use overwatch_rs::{
services::{relay::OutboundRelay, ServiceData},
DynError,
};
type MempoolRelay<Payload, Item, Key> = OutboundRelay<MempoolMsg<HeaderId, Payload, Item, Key>>;
pub struct KzgrsMempoolAdapter<
DaPoolAdapter,
DaPool,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
> where
DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug,
DaPool::Item: Clone + Eq + Hash + Debug + 'static,
DaPool::Key: Debug + 'static,
{
pub mempool_relay: MempoolRelay<DaPoolAdapter::Payload, DaPool::Item, DaPool::Key>,
_phantom: PhantomData<(
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
)>,
}
#[async_trait::async_trait]
impl<
DaPoolAdapter,
DaPool,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
> DaMempoolAdapter
for KzgrsMempoolAdapter<
DaPoolAdapter,
DaPool,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
>
where
DaPool: MemPool<BlockId = HeaderId, Key = BlobId>,
DaPoolAdapter: MempoolAdapter<Key = DaPool::Key, Payload = BlobInfo>,
DaPoolAdapter::Payload: DispersedBlobInfo + Into<DaPool::Item> + Debug + Send,
DaPool::Item: Clone + Eq + Hash + Debug + Send + 'static,
DaPool::Key: Debug + Send + 'static,
SamplingRng: SeedableRng + RngCore + Sync,
SamplingBackend: DaSamplingServiceBackend<SamplingRng, BlobId = DaPool::Key> + Send + Sync,
SamplingBackend::Settings: Clone,
SamplingBackend::Blob: Debug + 'static,
SamplingBackend::BlobId: Debug + 'static,
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + Sync,
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + Sync,
{
type MempoolService = DaMempoolService<
DaPoolAdapter,
DaPool,
SamplingBackend,
SamplingNetworkAdapter,
SamplingRng,
SamplingStorage,
>;
type BlobId = BlobId;
type Metadata = dispersal::Metadata;
fn new(mempool_relay: OutboundRelay<<Self::MempoolService as ServiceData>::Message>) -> Self {
Self {
mempool_relay,
_phantom: Default::default(),
}
}
async fn post_blob_id(
&self,
blob_id: Self::BlobId,
metadata: Self::Metadata,
) -> Result<(), DynError> {
let (reply_channel, receiver) = oneshot::channel();
self.mempool_relay
.send(MempoolMsg::Add {
payload: BlobInfo::new(blob_id, metadata),
key: blob_id,
reply_channel,
})
.await
.map_err(|(e, _)| Box::new(e) as DynError)?;
receiver
.await
.map_err(|e| Box::new(e) as DynError)?
.map_err(|_| "Failed to receive response from the mempool".into())
}
}

View File

@ -1,3 +1,5 @@
pub mod kzgrs;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
@ -6,8 +8,13 @@ use overwatch_rs::DynError;
pub trait DaMempoolAdapter {
type MempoolService: ServiceData;
type BlobId;
type Metadata;
fn new(outbound_relay: OutboundRelay<<Self::MempoolService as ServiceData>::Message>) -> Self;
async fn post_blob_id(&self, blob_id: Self::BlobId) -> Result<(), DynError>;
async fn post_blob_id(
&self,
blob_id: Self::BlobId,
metadata: Self::Metadata,
) -> Result<(), DynError>;
}

View File

@ -1,20 +1,23 @@
// std
use std::sync::Arc;
use std::time::Duration;
// crates
use futures::StreamExt;
use itertools::izip;
// internal
use crate::adapters::mempool::DaMempoolAdapter;
use crate::adapters::network::DispersalNetworkAdapter;
use crate::backend::DispersalBackend;
use futures::StreamExt;
use itertools::izip;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::{build_blob_id, Column, ColumnIndex};
use kzgrs_backend::encoder;
use kzgrs_backend::encoder::EncodedData;
use kzgrs_backend::{dispersal, encoder};
use nomos_core::da::{BlobId, DaDispersal, DaEncoder};
use overwatch_rs::DynError;
use std::sync::Arc;
use std::time::Duration;
pub struct DispersalKZGRSBackendSettings {
encoder_settings: encoder::DaEncoderParams,
dispersal_timeout: Duration,
pub encoder_settings: encoder::DaEncoderParams,
pub dispersal_timeout: Duration,
}
pub struct DispersalKZGRSBackend<NetworkAdapter, MempoolAdapter> {
settings: DispersalKZGRSBackendSettings,
@ -75,13 +78,14 @@ impl<NetworkAdapter, MempoolAdapter> DispersalBackend
where
NetworkAdapter: DispersalNetworkAdapter + Send + Sync,
NetworkAdapter::SubnetworkId: From<usize> + Send + Sync,
MempoolAdapter: DaMempoolAdapter<BlobId = BlobId> + Send + Sync,
MempoolAdapter: DaMempoolAdapter<BlobId = BlobId, Metadata = dispersal::Metadata> + Send + Sync,
{
type Settings = DispersalKZGRSBackendSettings;
type Encoder = encoder::DaEncoder;
type Dispersal = DispersalFromAdapter<NetworkAdapter>;
type NetworkAdapter = NetworkAdapter;
type MempoolAdapter = MempoolAdapter;
type Metadata = dispersal::Metadata;
type BlobId = BlobId;
fn init(
@ -125,8 +129,12 @@ where
.await
}
async fn publish_to_mempool(&self, blob_id: Self::BlobId) -> Result<(), DynError> {
self.mempool_adapter.post_blob_id(blob_id).await
async fn publish_to_mempool(
&self,
blob_id: Self::BlobId,
metadata: Self::Metadata,
) -> Result<(), DynError> {
self.mempool_adapter.post_blob_id(blob_id, metadata).await
}
}

View File

@ -1,6 +1,6 @@
use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapter};
use nomos_core::da::{DaDispersal, DaEncoder};
use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder};
use overwatch_rs::DynError;
pub mod kzgrs;
@ -12,6 +12,7 @@ pub trait DispersalBackend {
type Dispersal: DaDispersal<EncodedData = <Self::Encoder as DaEncoder>::EncodedData>;
type NetworkAdapter: DispersalNetworkAdapter;
type MempoolAdapter: DaMempoolAdapter;
type Metadata: metadata::Metadata + Send;
type BlobId: Send;
fn init(
@ -28,12 +29,20 @@ pub trait DispersalBackend {
encoded_data: <Self::Encoder as DaEncoder>::EncodedData,
) -> Result<(), DynError>;
async fn publish_to_mempool(&self, blob_id: Self::BlobId) -> Result<(), DynError>;
async fn publish_to_mempool(
&self,
blob_id: Self::BlobId,
metadata: Self::Metadata,
) -> Result<(), DynError>;
async fn process_dispersal(&self, data: Vec<u8>) -> Result<(), DynError> {
async fn process_dispersal(
&self,
data: Vec<u8>,
metadata: Self::Metadata,
) -> Result<(), DynError> {
let (blob_id, encoded_data) = self.encode(data).await?;
self.disperse(encoded_data).await?;
self.publish_to_mempool(blob_id).await?;
self.publish_to_mempool(blob_id, metadata).await?;
Ok(())
}
}

View File

@ -3,10 +3,12 @@ use std::fmt::Debug;
use std::marker::PhantomData;
// crates
use tokio::sync::oneshot;
use tracing::log::error;
// internal
use crate::adapters::mempool::DaMempoolAdapter;
use crate::adapters::network::DispersalNetworkAdapter;
use crate::backend::DispersalBackend;
use nomos_core::da::blob::metadata;
use nomos_da_network_core::{PeerId, SubnetworkId};
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
@ -14,7 +16,6 @@ use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use subnetworks_assignations::MembershipHandler;
use tracing::log::error;
mod adapters;
pub mod backend;
@ -22,21 +23,22 @@ pub mod backend;
const DA_DISPERSAL_TAG: ServiceId = "DA-Encoder";
#[derive(Debug)]
pub enum DaDispersalMsg {
pub enum DaDispersalMsg<Metadata> {
Disperse {
blob: Vec<u8>,
metadata: Metadata,
reply_channel: oneshot::Sender<Result<(), DynError>>,
},
}
impl RelayMessage for DaDispersalMsg {}
impl<Metadata: 'static> RelayMessage for DaDispersalMsg<Metadata> {}
#[derive(Clone)]
pub struct DispersalServiceSettings<BackendSettings> {
backend: BackendSettings,
pub backend: BackendSettings,
}
pub struct DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership>
pub struct DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership, Metadata>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
@ -44,10 +46,11 @@ where
+ Send
+ Sync
+ 'static,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter>,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter, Metadata = Metadata>,
Backend::Settings: Clone,
NetworkAdapter: DispersalNetworkAdapter,
MempoolAdapter: DaMempoolAdapter,
Metadata: metadata::Metadata + Debug + 'static,
{
service_state: ServiceStateHandle<Self>,
network_relay: Relay<NetworkAdapter::NetworkService>,
@ -55,8 +58,8 @@ where
_backend: PhantomData<Backend>,
}
impl<Backend, NetworkAdapter, MempoolAdapter, Membership> ServiceData
for DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership>
impl<Backend, NetworkAdapter, MempoolAdapter, Membership, Metadata> ServiceData
for DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership, Metadata>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
@ -64,21 +67,22 @@ where
+ Send
+ Sync
+ 'static,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter>,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter, Metadata = Metadata>,
Backend::Settings: Clone,
NetworkAdapter: DispersalNetworkAdapter,
MempoolAdapter: DaMempoolAdapter,
Metadata: metadata::Metadata + Debug + 'static,
{
const SERVICE_ID: ServiceId = DA_DISPERSAL_TAG;
type Settings = DispersalServiceSettings<Backend::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = DaDispersalMsg;
type Message = DaDispersalMsg<Metadata>;
}
#[async_trait::async_trait]
impl<Backend, NetworkAdapter, MempoolAdapter, Membership> ServiceCore
for DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership>
impl<Backend, NetworkAdapter, MempoolAdapter, Membership, Metadata> ServiceCore
for DispersalService<Backend, NetworkAdapter, MempoolAdapter, Membership, Metadata>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
+ Clone
@ -86,12 +90,16 @@ where
+ Send
+ Sync
+ 'static,
Backend: DispersalBackend<NetworkAdapter = NetworkAdapter, MempoolAdapter = MempoolAdapter>
+ Send
Backend: DispersalBackend<
NetworkAdapter = NetworkAdapter,
MempoolAdapter = MempoolAdapter,
Metadata = Metadata,
> + Send
+ Sync,
Backend::Settings: Clone + Send + Sync,
NetworkAdapter: DispersalNetworkAdapter<SubnetworkId = Membership::NetworkId> + Send,
MempoolAdapter: DaMempoolAdapter,
Metadata: metadata::Metadata + Debug + Send + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let network_relay = service_state.overwatch_handle.relay();
@ -124,9 +132,12 @@ where
match dispersal_msg {
DaDispersalMsg::Disperse {
blob,
metadata,
reply_channel,
} => {
if let Err(Err(e)) = reply_channel.send(backend.process_dispersal(blob).await) {
if let Err(Err(e)) =
reply_channel.send(backend.process_dispersal(blob, metadata).await)
{
error!("Error forwarding dispersal response: {e}");
}
}