diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 940ab309..5072c7ba 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -17,6 +17,7 @@ http = "0.2.9" hex = "0.4.3" kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" } subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" } +nomos-da-network-core = { path = "../../nomos-da/network/core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" diff --git a/nodes/nomos-node/src/api.rs b/nodes/nomos-node/src/api.rs index 80a45571..99c77447 100644 --- a/nodes/nomos-node/src/api.rs +++ b/nodes/nomos-node/src/api.rs @@ -21,8 +21,10 @@ use nomos_core::da::blob::info::DispersedBlobInfo; use nomos_core::da::blob::metadata::Metadata; use nomos_core::da::DaVerifier as CoreDaVerifier; use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction}; +use nomos_da_network_core::SubnetworkId; use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_da_verifier::backend::VerifierBackend; +use nomos_libp2p::PeerId; use nomos_mempool::{ network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, tx::service::openapi::Status, MempoolMetrics, @@ -32,6 +34,7 @@ use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; use rand::{RngCore, SeedableRng}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use subnetworks_assignations::MembershipHandler; use tower_http::{ cors::{Any, CorsLayer}, trace::TraceLayer, @@ -53,6 +56,7 @@ pub struct AxumBackend< A, B, C, + M, V, VB, T, @@ -66,6 +70,7 @@ pub struct AxumBackend< _attestation: core::marker::PhantomData, _blob: core::marker::PhantomData, _certificate: core::marker::PhantomData, + _membership: core::marker::PhantomData, _vid: core::marker::PhantomData, _verifier_backend: core::marker::PhantomData, _tx: core::marker::PhantomData, @@ -93,6 +98,7 @@ impl< A, B, C, + M, V, VB, T, @@ -106,6 +112,7 @@ impl< A, B, C, + M, V, VB, T, @@ -129,6 +136,12 @@ where + Sync + 'static, ::BlobId: Clone + Send + Sync, + M: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, V: DispersedBlobInfo + From + Eq @@ -180,6 +193,7 @@ where _attestation: core::marker::PhantomData, _blob: core::marker::PhantomData, _certificate: core::marker::PhantomData, + _membership: core::marker::PhantomData, _vid: core::marker::PhantomData, _verifier_backend: core::marker::PhantomData, _tx: core::marker::PhantomData, @@ -241,7 +255,7 @@ where >, ), ) - .route("/da/add_blob", routing::post(add_blob::)) + .route("/da/add_blob", routing::post(add_blob::)) .route( "/da/get_range", routing::post( @@ -441,7 +455,7 @@ where (status = 500, description = "Internal server error", body = String), ) )] -async fn add_blob( +async fn add_blob( State(handle): State, Json(blob): Json, ) -> Response @@ -450,12 +464,18 @@ where B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, ::BlobId: AsRef<[u8]> + Send + Sync + 'static, ::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static, + M: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, VB: VerifierBackend + CoreDaVerifier, ::Settings: Clone, ::Error: Error, SS: StorageSerde + Send + Sync + 'static, { - make_request_and_return_response!(da::add_blob::(&handle, blob)) + make_request_and_return_response!(da::add_blob::(&handle, blob)) } #[derive(Serialize, Deserialize)] diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index f173d98c..7de7c518 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -58,6 +58,7 @@ pub type NomosApiService = ApiService< (), DaBlob, BlobInfo, + NomosDaMembership, BlobInfo, KzgrsDaVerifier, Tx, @@ -124,7 +125,7 @@ pub type DaSampling = DaSamplingService< pub type DaVerifier = DaVerifierService< KzgrsDaVerifier, - VerifierNetworkAdapter, + VerifierNetworkAdapter, VerifierStorageAdapter<(), DaBlob, Wire>, >; diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 3f4acdc9..0784fd5e 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -31,6 +31,8 @@ full-replication = { path = "../../nomos-da/full-replication" } kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" } rand = "0.8" serde = { version = "1", features = ["derive"] } +subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" } +nomos-da-network-core = { path = "../../nomos-da/network/core" } tokio = { version = "1.33", default-features = false, features = ["sync"] } diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index e8f43ed7..e7b510c8 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -10,11 +10,13 @@ use nomos_da_indexer::DaMsg; use nomos_da_indexer::{ consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter, DataIndexerService, }; +use nomos_da_network_core::SubnetworkId; use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_da_verifier::backend::VerifierBackend; use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter; use nomos_da_verifier::{DaVerifierMsg, DaVerifierService}; +use nomos_libp2p::PeerId; use nomos_mempool::backend::mockpool::MockPool; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; use nomos_storage::backends::rocksdb::RocksBackend; @@ -27,6 +29,7 @@ use serde::Serialize; use std::error::Error; use std::fmt::Debug; use std::hash::Hash; +use subnetworks_assignations::MembershipHandler; use tokio::sync::oneshot; pub type DaIndexer< @@ -57,10 +60,10 @@ pub type DaIndexer< SamplingRng, >; -pub type DaVerifier = - DaVerifierService, VerifierStorageAdapter>; +pub type DaVerifier = + DaVerifierService, VerifierStorageAdapter>; -pub async fn add_blob( +pub async fn add_blob( handle: &OverwatchHandle, blob: B, ) -> Result, DynError> @@ -69,12 +72,21 @@ where B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, ::BlobId: AsRef<[u8]> + Send + Sync + 'static, ::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static, + M: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, VB: VerifierBackend + CoreDaVerifier, ::Settings: Clone, ::Error: Error, SS: StorageSerde + Send + Sync + 'static, { - let relay = handle.relay::>().connect().await?; + let relay = handle + .relay::>() + .connect() + .await?; let (sender, receiver) = oneshot::channel(); relay .send(DaVerifierMsg::AddBlob { diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 9421ec95..b9781ec2 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -28,12 +28,12 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } rand = "0.8" rand_chacha = "0.3" +subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1.15" tempfile = "3.6" tracing = "0.1" time = "0.3" -subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } [dev-dependencies] blake2 = { version = "0.10" } diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 7995bbd5..42dffd52 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -5,6 +5,7 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transa use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter; use nomos_da_indexer::DataIndexerService; +use nomos_da_sampling::DaSamplingService; use nomos_da_sampling::{ backend::kzgrs::KzgrsSamplingBackend, network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter, @@ -18,7 +19,7 @@ use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAda use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; use nomos_storage::backends::rocksdb::RocksBackend; use rand_chacha::ChaCha20Rng; -use subnetworks_assignations::versions::v2::FillWithOriginalReplication; +use subnetworks_assignations::versions::v1::FillFromNodeList; pub use nomos_core::{ da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, @@ -26,6 +27,9 @@ pub use nomos_core::{ use nomos_mempool::da::service::DaMempoolService; use nomos_node::{Tx, Wire}; +/// Membership used by the DA Network service. +pub type NomosDaMembership = FillFromNodeList; + pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, MockPool::Hash>, @@ -36,7 +40,13 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< FillSizeWithBlobs, RocksBackend, KzgrsSamplingBackend, - SamplingLibp2pAdapter, + SamplingLibp2pAdapter, + ChaCha20Rng, +>; + +pub type DaSampling = DaSamplingService< + KzgrsSamplingBackend, + SamplingLibp2pAdapter, ChaCha20Rng, >; @@ -55,7 +65,7 @@ pub(crate) type DaIndexer = DataIndexerService< FillSizeWithBlobs, RocksBackend, KzgrsSamplingBackend, - SamplingLibp2pAdapter, + SamplingLibp2pAdapter, ChaCha20Rng, >; @@ -71,7 +81,7 @@ pub(crate) type DaMempool = DaMempoolService< pub(crate) type DaVerifier = DaVerifierService< KzgrsDaVerifier, - Libp2pAdapter, + Libp2pAdapter, VerifierStorageAdapter<(), DaBlob, Wire>, >; diff --git a/nomos-services/data-availability/tests/src/verifier_integration.rs b/nomos-services/data-availability/tests/src/verifier_integration.rs index e57f20cd..d31aa895 100644 --- a/nomos-services/data-availability/tests/src/verifier_integration.rs +++ b/nomos-services/data-availability/tests/src/verifier_integration.rs @@ -14,9 +14,18 @@ use nomos_core::da::{blob::info::DispersedBlobInfo, DaEncoder as _}; use nomos_core::tx::Transaction; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings; use nomos_da_indexer::IndexerSettings; +use nomos_da_network_service::backends::libp2p::validator::{ + DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings, +}; +use nomos_da_network_service::NetworkConfig as DaNetworkConfig; +use nomos_da_network_service::NetworkService as DaNetworkService; +use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings; +use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings; +use nomos_da_sampling::DaSamplingServiceSettings; use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings; use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings; use nomos_da_verifier::DaVerifierServiceSettings; +use nomos_libp2p::{ed25519, identity, PeerId}; use nomos_libp2p::{Multiaddr, SwarmConfig}; use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; use nomos_mempool::{DaMempoolSettings, TxMempoolSettings}; @@ -28,6 +37,7 @@ use overwatch_derive::*; use overwatch_rs::overwatch::{Overwatch, OverwatchRunner}; use overwatch_rs::services::handle::ServiceHandle; use rand::{thread_rng, Rng, RngCore}; +use subnetworks_assignations::versions::v1::FillFromNodeList; use tempfile::{NamedTempFile, TempDir}; use time::OffsetDateTime; // internal @@ -43,12 +53,14 @@ struct ClientNode { #[derive(Services)] struct VerifierNode { network: ServiceHandle>, + da_network: ServiceHandle>>, cl_mempool: ServiceHandle, da_mempool: ServiceHandle, storage: ServiceHandle>>, cryptarchia: ServiceHandle, indexer: ServiceHandle, verifier: ServiceHandle, + da_sampling: ServiceHandle, } // Client node is just an empty overwatch service to spawn a task that could communicate with other @@ -87,6 +99,18 @@ fn new_node( initial_peers, }, }, + da_network: DaNetworkConfig { + backend: DaNetworkValidatorBackendSettings { + node_key: ed25519::SecretKey::generate(), + membership: FillFromNodeList::new( + &[PeerId::from(identity::Keypair::generate_ed25519().public())], + 2, + 1, + ), + addresses: Default::default(), + listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::().unwrap(), + }, + }, cl_mempool: TxMempoolSettings { backend: (), network: AdapterSettings { @@ -128,6 +152,19 @@ fn new_node( blob_storage_directory: blobs_dir.clone(), }, }, + da_sampling: DaSamplingServiceSettings { + // TODO: setup this properly! + sampling_settings: KzgrsSamplingBackendSettings { + num_samples: 0, + // Sampling service period can't be zero. + old_blobs_check_interval: Duration::from_secs(1), + blobs_validity_duration: Duration::from_secs(1), + }, + network_adapter_settings: DaNetworkSamplingSettings { + num_samples: 0, + subnet_size: 0, + }, + }, }, None, ) diff --git a/nomos-services/data-availability/verifier/Cargo.toml b/nomos-services/data-availability/verifier/Cargo.toml index 5575e062..074fb1f1 100644 --- a/nomos-services/data-availability/verifier/Cargo.toml +++ b/nomos-services/data-availability/verifier/Cargo.toml @@ -10,17 +10,20 @@ bytes = "1.2" futures = "0.3" hex = "0.4.3" kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" } +libp2p = { version = "0.54", features = ["ed25519"] } nomos-core = { path = "../../../nomos-core" } nomos-da-storage = { path = "../../../nomos-da/storage" } -nomos-network = { path = "../../../nomos-services/network" } +nomos-da-network-core = { path = "../../../nomos-da/network/core" } +nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" } nomos-storage = { path = "../../../nomos-services/storage" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } serde = { version = "1.0", features = ["derive"] } +subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" } tokio = { version = "1", features = ["sync", "macros"] } tokio-stream = "0.1.15" tracing = "0.1" [features] rocksdb-backend = ["nomos-storage/rocksdb-backend"] -libp2p = ["nomos-network/libp2p"] +libp2p = [] diff --git a/nomos-services/data-availability/verifier/src/lib.rs b/nomos-services/data-availability/verifier/src/lib.rs index 3339abb5..b7b688f6 100644 --- a/nomos-services/data-availability/verifier/src/lib.rs +++ b/nomos-services/data-availability/verifier/src/lib.rs @@ -17,7 +17,7 @@ use tracing::{error, span, Instrument, Level}; // internal use backend::VerifierBackend; use network::NetworkAdapter; -use nomos_network::NetworkService; +use nomos_da_network_service::NetworkService; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; @@ -66,7 +66,7 @@ where Backend::DaBlob: Debug + Send, Backend::Error: Error + Send + Sync, Backend::Settings: Clone, - N: NetworkAdapter + Send + 'static, + N: NetworkAdapter + Send + 'static, N::Settings: Clone, S: DaStorageAdapter + Send + 'static, { @@ -124,7 +124,7 @@ where Backend::Settings: Clone + Send + Sync + 'static, Backend::DaBlob: Debug + Send + Sync + 'static, Backend::Error: Error + Send + Sync + 'static, - N: NetworkAdapter + Send + Sync + 'static, + N: NetworkAdapter + Send + Sync + 'static, N::Settings: Clone + Send + Sync + 'static, S: DaStorageAdapter + Send + Sync + 'static, S::Settings: Clone + Send + Sync + 'static, @@ -173,27 +173,24 @@ where loop { tokio::select! { Some(blob) = blob_stream.next() => { - match Self::handle_new_blob(&verifier,&storage_adapter, &blob).await { - Ok(attestation) => if let Err(err) = network_adapter.send_attestation(attestation).await { - error!("Error replying attestation {err:?}"); - }, - Err(err) => error!("Error handling blob {blob:?} due to {err:?}"), + if let Err(err) = Self::handle_new_blob(&verifier,&storage_adapter, &blob).await { + error!("Error handling blob {blob:?} due to {err:?}"); } } Some(msg) = service_state.inbound_relay.recv() => { - let DaVerifierMsg::AddBlob { blob, reply_channel } = msg; - match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await { - Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) { + let DaVerifierMsg::AddBlob { blob, reply_channel } = msg; + match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await { + Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) { + error!("Error replying attestation {err:?}"); + }, + Err(err) => { + error!("Error handling blob {blob:?} due to {err:?}"); + if let Err(err) = reply_channel.send(None) { error!("Error replying attestation {err:?}"); - }, - Err(err) => { - error!("Error handling blob {blob:?} due to {err:?}"); - if let Err(err) = reply_channel.send(None) { - error!("Error replying attestation {err:?}"); - } - }, - }; - } + } + }, + }; + } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { break; diff --git a/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs b/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs index 3386e20b..80ac05dd 100644 --- a/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs +++ b/nomos-services/data-availability/verifier/src/network/adapters/libp2p.rs @@ -1,107 +1,79 @@ // std -use futures::Stream; -use overwatch_rs::DynError; +use std::fmt::Debug; use std::marker::PhantomData; // crates - -// internal -use crate::network::NetworkAdapter; -use nomos_core::wire; -use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash}; -use nomos_network::{NetworkMsg, NetworkService}; +use futures::Stream; +use kzgrs_backend::common::blob::DaBlob; +use libp2p::PeerId; +use nomos_da_network_core::SubnetworkId; +use nomos_da_network_service::backends::libp2p::validator::{ + DaNetworkEvent, DaNetworkEventKind, DaNetworkValidatorBackend, +}; +use nomos_da_network_service::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -use serde::de::DeserializeOwned; -use serde::Serialize; -use tokio_stream::wrappers::BroadcastStream; +use subnetworks_assignations::MembershipHandler; use tokio_stream::StreamExt; -use tracing::debug; +// internal +use crate::network::NetworkAdapter; pub const NOMOS_DA_TOPIC: &str = "NomosDa"; -pub struct Libp2pAdapter { - network_relay: OutboundRelay< as ServiceData>::Message>, - _blob: PhantomData, - _attestation: PhantomData, -} - -impl Libp2pAdapter +pub struct Libp2pAdapter where - B: Serialize + DeserializeOwned + Send + Sync + 'static, - A: Serialize + DeserializeOwned + Send + Sync + 'static, + M: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, { - async fn stream_for(&self) -> Box + Unpin + Send> { - let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC); - let (sender, receiver) = tokio::sync::oneshot::channel(); - self.network_relay - .send(NetworkMsg::Subscribe { - kind: EventKind::Message, - sender, - }) - .await - .expect("Network backend should be ready"); - let receiver = receiver.await.unwrap(); - Box::new(Box::pin(BroadcastStream::new(receiver).filter_map( - move |msg| match msg { - Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => { - match wire::deserialize::(&data) { - Ok(msg) => Some(msg), - Err(e) => { - debug!("Unrecognized message: {e}"); - None - } - } - } - _ => None, - }, - ))) - } - - async fn send(&self, data: E) -> Result<(), DynError> { - let message = wire::serialize(&data)?.into_boxed_slice(); - self.network_relay - .send(NetworkMsg::Process(Command::Broadcast { - topic: NOMOS_DA_TOPIC.to_string(), - message, - })) - .await - .map_err(|(e, _)| Box::new(e) as DynError) - } + network_relay: + OutboundRelay<> as ServiceData>::Message>, + _membership: PhantomData, } #[async_trait::async_trait] -impl NetworkAdapter for Libp2pAdapter +impl NetworkAdapter for Libp2pAdapter where - B: Serialize + DeserializeOwned + Send + Sync + 'static, - A: Serialize + DeserializeOwned + Send + Sync + 'static, + M: MembershipHandler + + Clone + + Debug + + Send + + Sync + + 'static, { - type Backend = Libp2p; + type Backend = DaNetworkValidatorBackend; type Settings = (); - type Blob = B; - type Attestation = A; + type Blob = DaBlob; async fn new( _settings: Self::Settings, network_relay: OutboundRelay< as ServiceData>::Message>, ) -> Self { - network_relay - .send(NetworkMsg::Process(Command::Subscribe( - NOMOS_DA_TOPIC.to_string(), - ))) - .await - .expect("Network backend should be ready"); Self { network_relay, - _blob: Default::default(), - _attestation: Default::default(), + _membership: Default::default(), } } async fn blob_stream(&self) -> Box + Unpin + Send> { - self.stream_for::().await - } + let (sender, receiver) = tokio::sync::oneshot::channel(); + self.network_relay + .send(nomos_da_network_service::DaNetworkMsg::Subscribe { + kind: DaNetworkEventKind::Verifying, + sender, + }) + .await + .expect("Network backend should be ready"); - async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> { - self.send(attestation).await + let receiver = receiver.await.expect("Blob stream should be received"); + + let stream = receiver.filter_map(move |msg| match msg { + DaNetworkEvent::Verifying(blob) => Some(*blob), + _ => None, + }); + + Box::new(Box::pin(stream)) } } diff --git a/nomos-services/data-availability/verifier/src/network/mod.rs b/nomos-services/data-availability/verifier/src/network/mod.rs index 83e63735..001a6398 100644 --- a/nomos-services/data-availability/verifier/src/network/mod.rs +++ b/nomos-services/data-availability/verifier/src/network/mod.rs @@ -1,19 +1,17 @@ pub mod adapters; use futures::Stream; -use nomos_network::backends::NetworkBackend; -use nomos_network::NetworkService; +use nomos_da_network_service::backends::NetworkBackend; +use nomos_da_network_service::NetworkService; use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; -use overwatch_rs::DynError; #[async_trait::async_trait] pub trait NetworkAdapter { - type Backend: NetworkBackend + 'static; + type Backend: NetworkBackend + Send + 'static; type Settings; type Blob; - type Attestation; async fn new( settings: Self::Settings, @@ -21,5 +19,4 @@ pub trait NetworkAdapter { ) -> Self; async fn blob_stream(&self) -> Box + Unpin + Send>; - async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>; }