From f8a73529cf4a7dffb7611967043d4431f81d58e2 Mon Sep 17 00:00:00 2001 From: gusto Date: Fri, 6 Sep 2024 16:47:38 +0300 Subject: [PATCH] DA: Incoming sampling request handling (#730) * WIP: Sample message to behaviour massage translation * Sampling request handling * Sampling storage in node * Sampling rocksdb scaffold --- nodes/nomos-node/Cargo.toml | 2 +- nodes/nomos-node/src/api.rs | 30 ++++- nodes/nomos-node/src/lib.rs | 6 + .../core/src/protocols/sampling/behaviour.rs | 104 +++++++++++++--- .../core/src/protocols/sampling/mod.rs | 36 +++--- .../api/src/http/consensus/cryptarchia.rs | 26 +++- nomos-services/api/src/http/da.rs | 16 ++- nomos-services/api/src/http/mempool.rs | 13 +- .../cryptarchia-consensus/src/lib.rs | 16 ++- .../data-availability/indexer/src/lib.rs | 14 +++ .../network/src/backends/libp2p/validator.rs | 44 ++++++- .../data-availability/sampling/Cargo.toml | 2 + .../data-availability/sampling/src/lib.rs | 114 ++++++++++++------ .../sampling/src/storage/adapters/mod.rs | 2 + .../sampling/src/storage/adapters/rocksdb.rs | 59 +++++++++ .../sampling/src/storage/mod.rs | 25 ++++ .../data-availability/tests/src/common.rs | 5 + .../tests/src/verifier_integration.rs | 4 + .../verifier/src/network/adapters/libp2p.rs | 2 - nomos-services/mempool/src/da/service.rs | 15 ++- tests/src/nodes/nomos.rs | 6 +- 21 files changed, 447 insertions(+), 94 deletions(-) create mode 100644 nomos-services/data-availability/sampling/src/storage/adapters/mod.rs create mode 100644 nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs create mode 100644 nomos-services/data-availability/sampling/src/storage/mod.rs diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 5072c7ba..4ef91b4d 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -23,7 +23,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d tracing = "0.1" multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } -nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling" } +nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling", features = ["rocksdb-backend"] } nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] } nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] } nomos-da-network-service = { path = "../../nomos-services/data-availability/network" } diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index 2f5ee2cf..cf0f09b3 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -64,6 +64,7 @@ pub struct AxumBackend< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, > { settings: AxumBackendSettings, @@ -78,6 +79,7 @@ pub struct AxumBackend< _sampling_backend: core::marker::PhantomData, _sampling_network_adapter: core::marker::PhantomData, _sampling_rng: core::marker::PhantomData, + _sampling_storage: core::marker::PhantomData, } #[derive(OpenApi)] @@ -106,6 +108,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, > Backend for AxumBackend< @@ -120,6 +123,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, > where @@ -182,6 +186,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static, { type Error = hyper::Error; type Settings = AxumBackendSettings; @@ -203,6 +208,7 @@ where _sampling_backend: core::marker::PhantomData, _sampling_network_adapter: core::marker::PhantomData, _sampling_rng: core::marker::PhantomData, + _sampling_storage: core::marker::PhantomData, }) } @@ -240,6 +246,7 @@ where SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, >, ), @@ -253,6 +260,7 @@ where SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, >, ), @@ -269,6 +277,7 @@ where SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, >, ), @@ -279,7 +288,13 @@ where .route( "/mempool/add/blobinfo", routing::post( - add_blob_info::, + add_blob_info::< + V, + SamplingBackend, + SamplingNetworkAdapter, + SamplingRng, + SamplingStorage, + >, ), ) .route("/metrics", routing::get(get_metrics)) @@ -370,6 +385,7 @@ async fn cryptarchia_info< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, >( State(handle): State, @@ -393,6 +409,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { make_request_and_return_response!(consensus::cryptarchia_info::< Tx, @@ -400,6 +417,7 @@ where SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, >(&handle)) } @@ -418,6 +436,7 @@ async fn cryptarchia_headers< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, >( State(store): State, @@ -442,6 +461,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { let QueryParams { from, to } = query; make_request_and_return_response!(consensus::cryptarchia_headers::< @@ -450,6 +470,7 @@ where SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, >(&store, from, to)) } @@ -511,6 +532,7 @@ async fn get_range< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, >( State(handle): State, @@ -561,6 +583,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { make_request_and_return_response!(da::get_range::< Tx, @@ -570,6 +593,7 @@ where SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, SIZE, >(&handle, app_id, range)) } @@ -631,7 +655,7 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn add_blob_info( +async fn add_blob_info( State(handle): State, Json(blob_info): Json, ) -> Response @@ -654,6 +678,7 @@ where SamplingBackend::BlobId: Debug + 'static, SamplingAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static, SamplingRng: SeedableRng + RngCore + Send + 'static, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { make_request_and_return_response!(mempool::add_blob_info::< NetworkBackend, @@ -663,6 +688,7 @@ where SamplingBackend, SamplingAdapter, SamplingRng, + SamplingStorage, >(&handle, blob_info, DispersedBlobInfo::blob_id)) } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 9b5adf0a..c7b86d4d 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -23,6 +23,7 @@ use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBac use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend; use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter; use nomos_da_sampling::DaSamplingService; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter as VerifierNetworkAdapter; @@ -66,6 +67,7 @@ pub type NomosApiService = ApiService< KzgrsSamplingBackend, nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, MB16, >, >; @@ -86,6 +88,7 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< KzgrsSamplingBackend, nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub type TxMempool = TxMempoolService< @@ -99,6 +102,7 @@ pub type DaMempool = DaMempoolService< KzgrsSamplingBackend, nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub type DaIndexer = DataIndexerService< @@ -118,12 +122,14 @@ pub type DaIndexer = DataIndexerService< KzgrsSamplingBackend, nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub type DaSampling = DaSamplingService< KzgrsSamplingBackend, SamplingLibp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub type DaVerifier = DaVerifierService< diff --git a/nomos-da/network/core/src/protocols/sampling/behaviour.rs b/nomos-da/network/core/src/protocols/sampling/behaviour.rs index 53f8184f..72d0568d 100644 --- a/nomos-da/network/core/src/protocols/sampling/behaviour.rs +++ b/nomos-da/network/core/src/protocols/sampling/behaviour.rs @@ -17,8 +17,10 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId, Stream}; use libp2p_stream::{Control, IncomingStreams, OpenStreamError}; +use nomos_core::da::BlobId; +use nomos_da_messages::sampling::sample_err::SampleErrType; use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes}; -use nomos_da_messages::{pack_message, unpack_from_reader}; +use nomos_da_messages::{common, pack_message, unpack_from_reader}; use subnetworks_assignations::MembershipHandler; use thiserror::Error; use tokio::sync::mpsc; @@ -57,7 +59,14 @@ pub enum SamplingError { error: bincode::Error, }, #[error("Error sending request: {request:?}")] - RequestChannel { request: SampleReq, peer_id: PeerId }, + RequestChannel { + request: BehaviourSampleReq, + peer_id: PeerId, + }, + #[error("Malformed blob id: {blob_id:?}")] + InvalidBlobId { peer_id: PeerId, blob_id: Vec }, + #[error("Blob not found: {blob_id:?}")] + BlobNotFound { peer_id: PeerId, blob_id: Vec }, #[error("Canceled response: {error}")] ResponseChannel { error: Canceled, peer_id: PeerId }, } @@ -71,6 +80,8 @@ impl SamplingError { SamplingError::Deserialize { peer_id, .. } => peer_id, SamplingError::RequestChannel { peer_id, .. } => peer_id, SamplingError::ResponseChannel { peer_id, .. } => peer_id, + SamplingError::InvalidBlobId { peer_id, .. } => peer_id, + SamplingError::BlobNotFound { peer_id, .. } => peer_id, } } @@ -132,13 +143,64 @@ impl Clone for SamplingError { peer_id: *peer_id, error: *error, }, + SamplingError::InvalidBlobId { blob_id, peer_id } => SamplingError::InvalidBlobId { + peer_id: *peer_id, + blob_id: blob_id.clone(), + }, + SamplingError::BlobNotFound { blob_id, peer_id } => SamplingError::BlobNotFound { + peer_id: *peer_id, + blob_id: blob_id.clone(), + }, } } } -/// Inner type representation of a Blob ID -// TODO: Use a proper type that is common to the codebase -type BlobId = [u8; 32]; +#[derive(Debug, Clone)] +pub struct BehaviourSampleReq { + pub blob_id: BlobId, +} + +impl TryFrom for BehaviourSampleReq { + type Error = Vec; + + fn try_from(req: SampleReq) -> Result { + let blob_id: BlobId = req.blob_id.try_into()?; + Ok(Self { blob_id }) + } +} + +#[derive(Debug)] +pub enum BehaviourSampleRes { + SamplingSuccess { + blob_id: BlobId, + subnetwork_id: SubnetworkId, + blob: Box, + }, + SampleNotFound { + blob_id: BlobId, + }, +} + +impl From for SampleRes { + fn from(res: BehaviourSampleRes) -> Self { + match res { + BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => SampleRes { + message_type: Some(sample_res::MessageType::Blob(common::Blob { + blob_id: blob_id.to_vec(), + data: bincode::serialize(&blob) + .expect("Blob from service should be serializable"), + })), + }, + BehaviourSampleRes::SampleNotFound { blob_id } => SampleRes { + message_type: Some(sample_res::MessageType::Err(SampleErr { + blob_id: blob_id.into(), + err_type: SampleErrType::NotFound.into(), + err_description: "Sample not found".to_string(), + })), + }, + } + } +} #[derive(Debug)] pub enum SamplingEvent { @@ -149,8 +211,8 @@ pub enum SamplingEvent { blob: Box, }, IncomingSample { - request_receiver: Receiver, - response_sender: Sender, + request_receiver: Receiver, + response_sender: Sender, }, SamplingError { error: SamplingError, @@ -165,8 +227,8 @@ struct SampleStream { /// Auxiliary struct that binds where to send a request and the pair channel to listen for a response struct ResponseChannel { - request_sender: Sender, - response_receiver: Receiver, + request_sender: Sender, + response_receiver: Receiver, } type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream); @@ -350,6 +412,12 @@ where peer_id: stream.peer_id, error, })?; + let request = BehaviourSampleReq::try_from(request).map_err(|blob_id| { + SamplingError::InvalidBlobId { + peer_id: stream.peer_id, + blob_id, + } + })?; channel .request_sender .send(request) @@ -357,14 +425,14 @@ where request, peer_id: stream.peer_id, })?; - let response = - channel - .response_receiver - .await - .map_err(|error| SamplingError::ResponseChannel { - error, - peer_id: stream.peer_id, - })?; + let response: SampleRes = channel + .response_receiver + .await + .map_err(|error| SamplingError::ResponseChannel { + error, + peer_id: stream.peer_id, + })? + .into(); let bytes = pack_message(&response).map_err(|error| SamplingError::Io { peer_id: stream.peer_id, error, @@ -394,7 +462,7 @@ where fn schedule_incoming_stream_task( incoming_tasks: &mut FuturesUnordered, sample_stream: SampleStream, - ) -> (Receiver, Sender) { + ) -> (Receiver, Sender) { let (request_sender, request_receiver) = oneshot::channel(); let (response_sender, response_receiver) = oneshot::channel(); let channel = ResponseChannel { diff --git a/nomos-da/network/core/src/protocols/sampling/mod.rs b/nomos-da/network/core/src/protocols/sampling/mod.rs index da295211..b7a232e6 100644 --- a/nomos-da/network/core/src/protocols/sampling/mod.rs +++ b/nomos-da/network/core/src/protocols/sampling/mod.rs @@ -3,7 +3,9 @@ pub mod behaviour; #[cfg(test)] mod test { use crate::address_book::AddressBook; - use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent}; + use crate::protocols::sampling::behaviour::{ + BehaviourSampleRes, SamplingBehaviour, SamplingEvent, + }; use crate::test_utils::AllNeighbours; use crate::SubnetworkId; use futures::StreamExt; @@ -13,8 +15,6 @@ mod test { use libp2p::swarm::SwarmEvent; use libp2p::{quic, Multiaddr, PeerId, Swarm, SwarmBuilder}; use log::debug; - use nomos_da_messages::common::Blob; - use nomos_da_messages::sampling::SampleRes; use std::time::Duration; use subnetworks_assignations::MembershipHandler; use tracing_subscriber::fmt::TestWriter; @@ -103,24 +103,18 @@ mod test { // spawn here because otherwise we block polling tokio::spawn(request_receiver); response_sender - .send(SampleRes { - message_type: Some( - nomos_da_messages::sampling::sample_res::MessageType::Blob( - Blob { - blob_id: vec![], - data: bincode::serialize(&DaBlob { - column_idx: 0, - column: Column(vec![]), - column_commitment: Default::default(), - aggregated_column_commitment: Default::default(), - aggregated_column_proof: Default::default(), - rows_commitments: vec![], - rows_proofs: vec![], - }) - .unwrap(), - }, - ), - ), + .send(BehaviourSampleRes::SamplingSuccess { + blob_id: Default::default(), + subnetwork_id: Default::default(), + blob: Box::new(DaBlob { + column: Column(vec![]), + column_idx: 0, + column_commitment: Default::default(), + aggregated_column_commitment: Default::default(), + aggregated_column_proof: Default::default(), + rows_commitments: vec![], + rows_proofs: vec![], + }), }) .unwrap() } diff --git a/nomos-services/api/src/http/consensus/cryptarchia.rs b/nomos-services/api/src/http/consensus/cryptarchia.rs index 79312222..bd78a1e8 100644 --- a/nomos-services/api/src/http/consensus/cryptarchia.rs +++ b/nomos-services/api/src/http/consensus/cryptarchia.rs @@ -31,6 +31,7 @@ pub type Cryptarchia< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, > = CryptarchiaConsensus< ConsensusNetworkAdapter, @@ -44,6 +45,7 @@ pub type Cryptarchia< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, >; pub async fn cryptarchia_info< @@ -52,6 +54,7 @@ pub async fn cryptarchia_info< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, >( handle: &OverwatchHandle, @@ -75,9 +78,18 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { let relay = handle - .relay::>() + .relay::>() .connect() .await?; let (sender, receiver) = oneshot::channel(); @@ -95,6 +107,7 @@ pub async fn cryptarchia_headers< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, >( handle: &OverwatchHandle, @@ -120,9 +133,18 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { let relay = handle - .relay::>() + .relay::>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index be36dc4b..6acd8755 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -40,6 +40,7 @@ pub type DaIndexer< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, > = DataIndexerService< // Indexer specific. @@ -58,6 +59,7 @@ pub type DaIndexer< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, >; pub type DaVerifier = @@ -107,6 +109,7 @@ pub async fn get_range< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, const SIZE: usize, >( handle: &OverwatchHandle, @@ -157,9 +160,20 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { let relay = handle - .relay::>() + .relay::>() .connect() .await?; let (sender, receiver) = oneshot::channel(); diff --git a/nomos-services/api/src/http/mempool.rs b/nomos-services/api/src/http/mempool.rs index 369e70dc..4452fa3e 100644 --- a/nomos-services/api/src/http/mempool.rs +++ b/nomos-services/api/src/http/mempool.rs @@ -45,7 +45,16 @@ where } } -pub async fn add_blob_info( +pub async fn add_blob_info< + N, + A, + Item, + Key, + SamplingBackend, + SamplingAdapter, + SamplingRng, + SamplingStorage, +>( handle: &overwatch_rs::overwatch::handle::OverwatchHandle, item: A::Payload, converter: impl Fn(&A::Payload) -> Key, @@ -63,6 +72,7 @@ where SamplingBackend::Settings: Clone, SamplingAdapter: DaSamplingNetworkAdapter + Send, SamplingRng: SeedableRng + RngCore + Send, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { let relay = handle .relay::>() .connect() .await?; diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs index eb850f98..92f89a61 100644 --- a/nomos-services/cryptarchia-consensus/src/lib.rs +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -152,6 +152,7 @@ pub struct CryptarchiaConsensus< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where A: NetworkAdapter, ClPoolAdapter: MempoolAdapter, @@ -173,6 +174,7 @@ pub struct CryptarchiaConsensus< SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { service_state: ServiceStateHandle, // underlying networking backend. We need this so we can relay and check the types properly @@ -186,9 +188,12 @@ pub struct CryptarchiaConsensus< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, >, >, - sampling_relay: Relay>, + sampling_relay: Relay< + DaSamplingService, + >, block_subscription_sender: broadcast::Sender>, storage_relay: Relay>, } @@ -205,6 +210,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > ServiceData for CryptarchiaConsensus< A, @@ -218,6 +224,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where A: NetworkAdapter, @@ -239,6 +246,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { const SERVICE_ID: ServiceId = CRYPTARCHIA_ID; type Settings = CryptarchiaSettings; @@ -260,6 +268,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > ServiceCore for CryptarchiaConsensus< A, @@ -273,6 +282,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where A: NetworkAdapter @@ -323,6 +333,7 @@ where SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); @@ -481,6 +492,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > CryptarchiaConsensus< A, @@ -494,6 +506,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where A: NetworkAdapter + Clone + Send + Sync + 'static, @@ -536,6 +549,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { async fn should_stop_service(message: LifecycleMessage) -> bool { match message { diff --git a/nomos-services/data-availability/indexer/src/lib.rs b/nomos-services/data-availability/indexer/src/lib.rs index f3d4e506..3f9c0a7a 100644 --- a/nomos-services/data-availability/indexer/src/lib.rs +++ b/nomos-services/data-availability/indexer/src/lib.rs @@ -42,6 +42,7 @@ pub type ConsensusRelay< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > = Relay< CryptarchiaConsensus< A, @@ -55,6 +56,7 @@ pub type ConsensusRelay< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, >, >; @@ -73,6 +75,7 @@ pub struct DataIndexerService< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where B: 'static, A: NetworkAdapter, @@ -96,6 +99,7 @@ pub struct DataIndexerService< SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { service_state: ServiceStateHandle, storage_relay: Relay>, @@ -111,6 +115,7 @@ pub struct DataIndexerService< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, >, } @@ -155,6 +160,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > ServiceData for DataIndexerService< B, @@ -171,6 +177,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where B: 'static, @@ -195,6 +202,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { const SERVICE_ID: ServiceId = "DaIndexer"; type Settings = IndexerSettings; @@ -218,6 +226,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > DataIndexerService< B, @@ -234,6 +243,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where B: Send + Sync + 'static, @@ -259,6 +269,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { async fn handle_new_block( storage_adapter: &DaStorage, @@ -323,6 +334,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > ServiceCore for DataIndexerService< B, @@ -339,6 +351,7 @@ impl< SamplingBackend, SamplingNetworkAdapter, SamplingRng, + SamplingStorage, > where B: Debug + Send + Sync, @@ -387,6 +400,7 @@ where SamplingBackend::Blob: Debug + 'static, SamplingBackend::BlobId: Debug + 'static, SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter, + SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter, { fn init(service_state: ServiceStateHandle) -> Result { let consensus_relay = service_state.overwatch_handle.relay(); diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index 33d0c8ec..f60ad009 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -6,7 +6,9 @@ use libp2p::{Multiaddr, PeerId}; use log::error; use nomos_core::da::BlobId; use nomos_da_network_core::protocols::sampling; -use nomos_da_network_core::protocols::sampling::behaviour::SamplingError; +use nomos_da_network_core::protocols::sampling::behaviour::{ + BehaviourSampleReq, BehaviourSampleRes, SamplingError, +}; use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; use nomos_da_network_core::SubnetworkId; use nomos_libp2p::secret_key_serde; @@ -17,9 +19,9 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::pin::Pin; use subnetworks_assignations::MembershipHandler; -use tokio::sync::broadcast; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{broadcast, mpsc}; use tokio::task::JoinHandle; use tokio_stream::wrappers::BroadcastStream; @@ -49,6 +51,11 @@ pub enum DaNetworkEventKind { pub enum SamplingEvent { /// A success sampling SamplingSuccess { blob_id: BlobId, blob: Box }, + /// Incoming sampling request + SamplingRequest { + blob_id: BlobId, + response_sender: mpsc::Sender>, + }, /// A failed sampling error SamplingError { error: SamplingError }, } @@ -214,8 +221,37 @@ async fn handle_validator_events_stream( error!("Error in internal broadcast of sampling success: {e:?}"); } } - sampling::behaviour::SamplingEvent::IncomingSample{ .. } => { - unimplemented!("Handle request/response from Sampling service"); + sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => { + if let Ok(BehaviourSampleReq { blob_id }) = request_receiver.await { + let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1); + + if let Err(e) = sampling_broadcast_sender + .send(SamplingEvent::SamplingRequest { blob_id, response_sender: sampling_response_sender }) + { + error!("Error in internal broadcast of sampling request: {e:?}"); + sampling_response_receiver.close() + } + + if let Some(maybe_blob) = sampling_response_receiver.recv().await { + let result = match maybe_blob { + Some(blob) => BehaviourSampleRes::SamplingSuccess { + blob_id, + subnetwork_id: blob.column_idx as u32, + blob: Box::new(blob), + }, + None => BehaviourSampleRes::SampleNotFound { blob_id }, + }; + + if response_sender.send(result).is_err() { + error!("Error sending sampling success response"); + } + } else if response_sender + .send(BehaviourSampleRes::SampleNotFound { blob_id }) + .is_err() + { + error!("Error sending sampling success response"); + } + } } sampling::behaviour::SamplingEvent::SamplingError{ error } => { if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) { diff --git a/nomos-services/data-availability/sampling/Cargo.toml b/nomos-services/data-availability/sampling/Cargo.toml index d917e99f..e89f3db6 100644 --- a/nomos-services/data-availability/sampling/Cargo.toml +++ b/nomos-services/data-availability/sampling/Cargo.toml @@ -13,6 +13,7 @@ libp2p-identity = { version = "0.2" } nomos-core = { path = "../../../nomos-core" } nomos-da-network-core = { path = "../../../nomos-da/network/core" } nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" } +nomos-da-storage = { path = "../../../nomos-da/storage" } nomos-storage = { path = "../../../nomos-services/storage" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } @@ -28,3 +29,4 @@ rand_chacha = "0.3.1" [features] default = ["libp2p"] libp2p = [] +rocksdb-backend = ["nomos-storage/rocksdb-backend"] diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 4bb3b6b3..18e880fd 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -1,29 +1,31 @@ pub mod backend; pub mod network; +pub mod storage; // std use std::collections::BTreeSet; use std::fmt::Debug; - // crates -use rand::prelude::*; -use tokio_stream::StreamExt; -use tracing::{error, span, Instrument, Level}; -// internal -use backend::{DaSamplingServiceBackend, SamplingState}; use kzgrs_backend::common::blob::DaBlob; use network::NetworkAdapter; use nomos_core::da::BlobId; use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; use nomos_da_network_service::NetworkService; +use nomos_storage::StorageService; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; use overwatch_rs::DynError; +use rand::{RngCore, SeedableRng}; use serde::{Deserialize, Serialize}; use tokio::sync::oneshot; +use tokio_stream::StreamExt; +use tracing::{error, span, Instrument, Level}; +// internal +use backend::{DaSamplingServiceBackend, SamplingState}; +use storage::DaStorageAdapter; const DA_SAMPLING_TAG: ServiceId = "DA-Sampling"; @@ -41,35 +43,40 @@ pub enum DaSamplingServiceMsg { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DaSamplingServiceSettings { +pub struct DaSamplingServiceSettings { pub sampling_settings: BackendSettings, pub network_adapter_settings: NetworkSettings, + pub storage_adapter_settings: StorageSettings, } impl RelayMessage for DaSamplingServiceMsg {} -pub struct DaSamplingService +pub struct DaSamplingService where - R: SeedableRng + RngCore, - Backend: DaSamplingServiceBackend + Send, + SamplingRng: SeedableRng + RngCore, + Backend: DaSamplingServiceBackend + Send, Backend::Settings: Clone, Backend::Blob: Debug + 'static, Backend::BlobId: Debug + 'static, - N: NetworkAdapter, - N::Settings: Clone, + DaNetwork: NetworkAdapter, + DaNetwork::Settings: Clone, + DaStorage: DaStorageAdapter, { - network_relay: Relay>, + network_relay: Relay>, + storage_relay: Relay>, service_state: ServiceStateHandle, sampler: Backend, } -impl DaSamplingService +impl + DaSamplingService where - R: SeedableRng + RngCore, - Backend: DaSamplingServiceBackend + Send + 'static, + SamplingRng: SeedableRng + RngCore, + Backend: DaSamplingServiceBackend + Send + 'static, Backend::Settings: Clone, - N: NetworkAdapter + Send + 'static, - N::Settings: Clone, + DaNetwork: NetworkAdapter + Send + 'static, + DaNetwork::Settings: Clone, + DaStorage: DaStorageAdapter, { async fn should_stop_service(message: LifecycleMessage) -> bool { match message { @@ -88,7 +95,7 @@ where async fn handle_service_message( msg: ::Message, - network_adapter: &mut N, + network_adapter: &mut DaNetwork, sampler: &mut Backend, ) { match msg { @@ -117,7 +124,11 @@ where } } - async fn handle_sampling_message(event: SamplingEvent, sampler: &mut Backend) { + async fn handle_sampling_message( + event: SamplingEvent, + sampler: &mut Backend, + storage_adapter: &DaStorage, + ) { match event { SamplingEvent::SamplingSuccess { blob_id, blob } => { sampler.handle_sampling_success(blob_id, *blob).await; @@ -129,35 +140,59 @@ where } error!("Error while sampling: {error}"); } + SamplingEvent::SamplingRequest { + blob_id, + response_sender, + } => match storage_adapter.get_blob(blob_id).await { + Ok(maybe_blob) => { + if response_sender.send(maybe_blob).await.is_err() { + error!("Error sending sampling response"); + } + } + Err(error) => { + error!("Failed to get blob from storage adapter: {error}"); + if response_sender.send(None).await.is_err() { + error!("Error sending sampling response"); + } + } + }, } } } -impl ServiceData for DaSamplingService +impl ServiceData + for DaSamplingService where - R: SeedableRng + RngCore, - Backend: DaSamplingServiceBackend + Send, + SamplingRng: SeedableRng + RngCore, + Backend: DaSamplingServiceBackend + Send, Backend::Settings: Clone, Backend::Blob: Debug + 'static, Backend::BlobId: Debug + 'static, - N: NetworkAdapter, - N::Settings: Clone, + DaNetwork: NetworkAdapter, + DaNetwork::Settings: Clone, + DaStorage: DaStorageAdapter, { const SERVICE_ID: ServiceId = DA_SAMPLING_TAG; - type Settings = DaSamplingServiceSettings; + type Settings = + DaSamplingServiceSettings; type State = NoState; type StateOperator = NoOperator; type Message = DaSamplingServiceMsg; } #[async_trait::async_trait] -impl ServiceCore for DaSamplingService +impl ServiceCore + for DaSamplingService where - R: SeedableRng + RngCore, - Backend: DaSamplingServiceBackend + Send + Sync + 'static, + SamplingRng: SeedableRng + RngCore, + Backend: DaSamplingServiceBackend + + Send + + Sync + + 'static, Backend::Settings: Clone + Send + Sync + 'static, - N: NetworkAdapter + Send + Sync + 'static, - N::Settings: Clone + Send + Sync + 'static, + DaNetwork: NetworkAdapter + Send + Sync + 'static, + DaNetwork::Settings: Clone + Send + Sync + 'static, + DaStorage: DaStorageAdapter + Sync + Send, { fn init(service_state: ServiceStateHandle) -> Result { let DaSamplingServiceSettings { @@ -165,10 +200,12 @@ where } = service_state.settings_reader.get_updated_settings(); let network_relay = service_state.overwatch_handle.relay(); - let rng = R::from_entropy(); + let storage_relay = service_state.overwatch_handle.relay(); + let rng = SamplingRng::from_entropy(); Ok(Self { network_relay, + storage_relay, service_state, sampler: Backend::new(sampling_settings, rng), }) @@ -181,17 +218,24 @@ where // position of his bls public key landing in the above-mentioned list. let Self { network_relay, + storage_relay, mut service_state, mut sampler, } = self; - let DaSamplingServiceSettings { .. } = service_state.settings_reader.get_updated_settings(); + let DaSamplingServiceSettings { + storage_adapter_settings, + .. + } = service_state.settings_reader.get_updated_settings(); let network_relay = network_relay.connect().await?; - let mut network_adapter = N::new(network_relay).await; + let mut network_adapter = DaNetwork::new(network_relay).await; let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?; let mut next_prune_tick = sampler.prune_interval(); + let storage_relay = storage_relay.connect().await?; + let storage_adapter = DaStorage::new(storage_adapter_settings, storage_relay).await; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); async { loop { @@ -200,7 +244,7 @@ where Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await; } Some(sampling_message) = sampling_message_stream.next() => { - Self::handle_sampling_message(sampling_message, &mut sampler).await; + Self::handle_sampling_message(sampling_message, &mut sampler, &storage_adapter).await; } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { diff --git a/nomos-services/data-availability/sampling/src/storage/adapters/mod.rs b/nomos-services/data-availability/sampling/src/storage/adapters/mod.rs new file mode 100644 index 00000000..663a2ff3 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/storage/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "rocksdb-backend")] +pub mod rocksdb; diff --git a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs new file mode 100644 index 00000000..303754f6 --- /dev/null +++ b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs @@ -0,0 +1,59 @@ +// std +use serde::{Deserialize, Serialize}; +use std::{marker::PhantomData, path::PathBuf}; +// crates +use nomos_core::da::blob::Blob; +use nomos_storage::{ + backends::{rocksdb::RocksBackend, StorageSerde}, + StorageMsg, StorageService, +}; +use overwatch_rs::{ + services::{relay::OutboundRelay, ServiceData}, + DynError, +}; +// internal +use crate::storage::DaStorageAdapter; + +pub struct RocksAdapter +where + S: StorageSerde + Send + Sync + 'static, +{ + _settings: RocksAdapterSettings, + _storage_relay: OutboundRelay>>, + blob: PhantomData, +} + +#[async_trait::async_trait] +impl DaStorageAdapter for RocksAdapter +where + S: StorageSerde + Send + Sync + 'static, + B: Blob + Clone + Send + Sync + 'static, + B::BlobId: Send, +{ + type Backend = RocksBackend; + type Blob = B; + type Settings = RocksAdapterSettings; + + async fn new( + _settings: Self::Settings, + _storage_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self { + Self { + _settings, + _storage_relay, + blob: PhantomData, + } + } + + async fn get_blob( + &self, + _blob_id: ::BlobId, + ) -> Result, DynError> { + todo!() + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RocksAdapterSettings { + pub blob_storage_directory: PathBuf, +} diff --git a/nomos-services/data-availability/sampling/src/storage/mod.rs b/nomos-services/data-availability/sampling/src/storage/mod.rs new file mode 100644 index 00000000..c0433f3f --- /dev/null +++ b/nomos-services/data-availability/sampling/src/storage/mod.rs @@ -0,0 +1,25 @@ +pub mod adapters; + +use nomos_core::da::blob::Blob; +use nomos_storage::{backends::StorageBackend, StorageService}; +use overwatch_rs::{ + services::{relay::OutboundRelay, ServiceData}, + DynError, +}; + +#[async_trait::async_trait] +pub trait DaStorageAdapter { + type Backend: StorageBackend + Send + Sync + 'static; + type Settings: Clone + Send + Sync + 'static; + type Blob: Blob + Clone; + + async fn new( + settings: Self::Settings, + storage_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self; + + async fn get_blob( + &self, + blob_id: ::BlobId, + ) -> Result, DynError>; +} diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 80d7d5ce..845a954c 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -5,6 +5,7 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transa use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter; use nomos_da_indexer::DataIndexerService; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter; use nomos_da_sampling::DaSamplingService; use nomos_da_sampling::{ backend::kzgrs::KzgrsSamplingBackend, @@ -42,12 +43,14 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< KzgrsSamplingBackend, SamplingLibp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub type DaSampling = DaSamplingService< KzgrsSamplingBackend, SamplingLibp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub(crate) type DaIndexer = DataIndexerService< @@ -67,6 +70,7 @@ pub(crate) type DaIndexer = DataIndexerService< KzgrsSamplingBackend, SamplingLibp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub(crate) type TxMempool = TxMempoolService< @@ -80,6 +84,7 @@ pub type DaMempool = DaMempoolService< KzgrsSamplingBackend, nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter, ChaCha20Rng, + SamplingStorageAdapter, >; pub(crate) type DaVerifier = DaVerifierService< diff --git a/nomos-services/data-availability/tests/src/verifier_integration.rs b/nomos-services/data-availability/tests/src/verifier_integration.rs index d31aa895..80bc6141 100644 --- a/nomos-services/data-availability/tests/src/verifier_integration.rs +++ b/nomos-services/data-availability/tests/src/verifier_integration.rs @@ -21,6 +21,7 @@ use nomos_da_network_service::NetworkConfig as DaNetworkConfig; use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageSettings; use nomos_da_sampling::DaSamplingServiceSettings; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings; @@ -164,6 +165,9 @@ fn new_node( num_samples: 0, subnet_size: 0, }, + storage_adapter_settings: SamplingStorageSettings { + blob_storage_directory: blobs_dir.clone(), + }, }, }, None, diff --git a/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs b/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs index 80ac05dd..137caef8 100644 --- a/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs @@ -17,8 +17,6 @@ use tokio_stream::StreamExt; // internal use crate::network::NetworkAdapter; -pub const NOMOS_DA_TOPIC: &str = "NomosDa"; - pub struct Libp2pAdapter where M: MembershipHandler diff --git a/nomos-services/mempool/src/da/service.rs b/nomos-services/mempool/src/da/service.rs index 2b7686a7..0a1384a4 100644 --- a/nomos-services/mempool/src/da/service.rs +++ b/nomos-services/mempool/src/da/service.rs @@ -12,6 +12,7 @@ use std::fmt::Debug; // #[cfg(feature = "metrics")] // use super::metrics::Metrics; use futures::StreamExt; +use nomos_da_sampling::storage::DaStorageAdapter; use rand::{RngCore, SeedableRng}; // internal use crate::backend::MemPool; @@ -33,7 +34,7 @@ use overwatch_rs::services::{ }; use tracing::error; -pub struct DaMempoolService +pub struct DaMempoolService where N: NetworkAdapter, N::Payload: DispersedBlobInfo + Into + Debug + 'static, @@ -47,18 +48,19 @@ where DB::BlobId: Debug + 'static, DB::Settings: Clone, DN: DaSamplingNetworkAdapter, + SamplingStorage: DaStorageAdapter, R: SeedableRng + RngCore, { service_state: ServiceStateHandle, network_relay: Relay>, - sampling_relay: Relay>, + sampling_relay: Relay>, pool: P, // TODO: Add again after metrics refactor // #[cfg(feature = "metrics")] // metrics: Option, } -impl ServiceData for DaMempoolService +impl ServiceData for DaMempoolService where N: NetworkAdapter, N::Payload: DispersedBlobInfo + Debug + Into + 'static, @@ -72,6 +74,7 @@ where DB::BlobId: Debug + 'static, DB::Settings: Clone, DN: DaSamplingNetworkAdapter, + DaStorage: DaStorageAdapter, R: SeedableRng + RngCore, { const SERVICE_ID: ServiceId = "mempool-da"; @@ -87,7 +90,7 @@ where } #[async_trait::async_trait] -impl ServiceCore for DaMempoolService +impl ServiceCore for DaMempoolService where P: MemPool + Send + 'static, P::Settings: Clone + Send + Sync + 'static, @@ -102,6 +105,7 @@ where DB::BlobId: Debug + 'static, DB::Settings: Clone, DN: DaSamplingNetworkAdapter, + DaStorage: DaStorageAdapter, R: SeedableRng + RngCore, { fn init(service_state: ServiceStateHandle) -> Result { @@ -178,7 +182,7 @@ where } } -impl DaMempoolService +impl DaMempoolService where P: MemPool + Send + 'static, P::Settings: Clone + Send + Sync + 'static, @@ -193,6 +197,7 @@ where DB::BlobId: Debug + 'static, DB::Settings: Clone, DN: DaSamplingNetworkAdapter, + DaStorage: DaStorageAdapter, R: SeedableRng + RngCore, { async fn should_stop_service(message: LifecycleMessage) -> bool { diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 3cb49f2c..176899a6 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -22,6 +22,7 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as Indexe use nomos_da_indexer::IndexerSettings; use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackendSettings; use nomos_da_network_service::NetworkConfig as DaNetworkConfig; +use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; use nomos_da_verifier::DaVerifierServiceSettings; @@ -130,7 +131,7 @@ impl NomosNode { pub async fn get_block(&self, id: HeaderId) -> Option> { CLIENT - .post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) + .post(format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) .header("Content-Type", "application/json") .body(serde_json::to_string(&id).unwrap()) .send() @@ -434,6 +435,9 @@ fn create_node_config( num_samples: 0, subnet_size: 0, }, + storage_adapter_settings: SamplingStorageAdapterSettings { + blob_storage_directory: "./".into(), + }, }, };