DA network in verifier libp2p adapter (#715)
* Use da network in verifier libp2p adapter * VerifierAdapter in da tests * Simplify verifier adapter * Start da_network in da tests
This commit is contained in:
parent
a13f8611ee
commit
93509bc09f
|
@ -17,6 +17,7 @@ http = "0.2.9"
|
|||
hex = "0.4.3"
|
||||
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
|
||||
subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" }
|
||||
nomos-da-network-core = { path = "../../nomos-da/network/core" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -21,8 +21,10 @@ use nomos_core::da::blob::info::DispersedBlobInfo;
|
|||
use nomos_core::da::blob::metadata::Metadata;
|
||||
use nomos_core::da::DaVerifier as CoreDaVerifier;
|
||||
use nomos_core::{da::blob::Blob, header::HeaderId, tx::Transaction};
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_da_sampling::backend::DaSamplingServiceBackend;
|
||||
use nomos_da_verifier::backend::VerifierBackend;
|
||||
use nomos_libp2p::PeerId;
|
||||
use nomos_mempool::{
|
||||
network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
|
||||
tx::service::openapi::Status, MempoolMetrics,
|
||||
|
@ -32,6 +34,7 @@ use nomos_storage::backends::StorageSerde;
|
|||
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tower_http::{
|
||||
cors::{Any, CorsLayer},
|
||||
trace::TraceLayer,
|
||||
|
@ -53,6 +56,7 @@ pub struct AxumBackend<
|
|||
A,
|
||||
B,
|
||||
C,
|
||||
M,
|
||||
V,
|
||||
VB,
|
||||
T,
|
||||
|
@ -66,6 +70,7 @@ pub struct AxumBackend<
|
|||
_attestation: core::marker::PhantomData<A>,
|
||||
_blob: core::marker::PhantomData<B>,
|
||||
_certificate: core::marker::PhantomData<C>,
|
||||
_membership: core::marker::PhantomData<M>,
|
||||
_vid: core::marker::PhantomData<V>,
|
||||
_verifier_backend: core::marker::PhantomData<VB>,
|
||||
_tx: core::marker::PhantomData<T>,
|
||||
|
@ -93,6 +98,7 @@ impl<
|
|||
A,
|
||||
B,
|
||||
C,
|
||||
M,
|
||||
V,
|
||||
VB,
|
||||
T,
|
||||
|
@ -106,6 +112,7 @@ impl<
|
|||
A,
|
||||
B,
|
||||
C,
|
||||
M,
|
||||
V,
|
||||
VB,
|
||||
T,
|
||||
|
@ -129,6 +136,12 @@ where
|
|||
+ Sync
|
||||
+ 'static,
|
||||
<C as DispersedBlobInfo>::BlobId: Clone + Send + Sync,
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
V: DispersedBlobInfo<BlobId = [u8; 32]>
|
||||
+ From<C>
|
||||
+ Eq
|
||||
|
@ -180,6 +193,7 @@ where
|
|||
_attestation: core::marker::PhantomData,
|
||||
_blob: core::marker::PhantomData,
|
||||
_certificate: core::marker::PhantomData,
|
||||
_membership: core::marker::PhantomData,
|
||||
_vid: core::marker::PhantomData,
|
||||
_verifier_backend: core::marker::PhantomData,
|
||||
_tx: core::marker::PhantomData,
|
||||
|
@ -241,7 +255,7 @@ where
|
|||
>,
|
||||
),
|
||||
)
|
||||
.route("/da/add_blob", routing::post(add_blob::<A, B, VB, S>))
|
||||
.route("/da/add_blob", routing::post(add_blob::<A, B, M, VB, S>))
|
||||
.route(
|
||||
"/da/get_range",
|
||||
routing::post(
|
||||
|
@ -441,7 +455,7 @@ where
|
|||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_blob<A, B, VB, SS>(
|
||||
async fn add_blob<A, B, M, VB, SS>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(blob): Json<B>,
|
||||
) -> Response
|
||||
|
@ -450,12 +464,18 @@ where
|
|||
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
|
||||
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
VB: VerifierBackend + CoreDaVerifier<DaBlob = B>,
|
||||
<VB as VerifierBackend>::Settings: Clone,
|
||||
<VB as CoreDaVerifier>::Error: Error,
|
||||
SS: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
make_request_and_return_response!(da::add_blob::<A, B, VB, SS>(&handle, blob))
|
||||
make_request_and_return_response!(da::add_blob::<A, B, M, VB, SS>(&handle, blob))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
|
|
@ -58,6 +58,7 @@ pub type NomosApiService = ApiService<
|
|||
(),
|
||||
DaBlob,
|
||||
BlobInfo,
|
||||
NomosDaMembership,
|
||||
BlobInfo,
|
||||
KzgrsDaVerifier,
|
||||
Tx,
|
||||
|
@ -124,7 +125,7 @@ pub type DaSampling = DaSamplingService<
|
|||
|
||||
pub type DaVerifier = DaVerifierService<
|
||||
KzgrsDaVerifier,
|
||||
VerifierNetworkAdapter<DaBlob, ()>,
|
||||
VerifierNetworkAdapter<FillFromNodeList>,
|
||||
VerifierStorageAdapter<(), DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
|
|
|
@ -31,6 +31,8 @@ full-replication = { path = "../../nomos-da/full-replication" }
|
|||
kzgrs-backend = { path = "../../nomos-da/kzgrs-backend" }
|
||||
rand = "0.8"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
subnetworks-assignations = { path = "../../nomos-da/network/subnetworks-assignations" }
|
||||
nomos-da-network-core = { path = "../../nomos-da/network/core" }
|
||||
tokio = { version = "1.33", default-features = false, features = ["sync"] }
|
||||
|
||||
|
||||
|
|
|
@ -10,11 +10,13 @@ use nomos_da_indexer::DaMsg;
|
|||
use nomos_da_indexer::{
|
||||
consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter, DataIndexerService,
|
||||
};
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_da_sampling::backend::DaSamplingServiceBackend;
|
||||
use nomos_da_verifier::backend::VerifierBackend;
|
||||
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter;
|
||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapter as VerifierStorageAdapter;
|
||||
use nomos_da_verifier::{DaVerifierMsg, DaVerifierService};
|
||||
use nomos_libp2p::PeerId;
|
||||
use nomos_mempool::backend::mockpool::MockPool;
|
||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
|
||||
use nomos_storage::backends::rocksdb::RocksBackend;
|
||||
|
@ -27,6 +29,7 @@ use serde::Serialize;
|
|||
use std::error::Error;
|
||||
use std::fmt::Debug;
|
||||
use std::hash::Hash;
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub type DaIndexer<
|
||||
|
@ -57,10 +60,10 @@ pub type DaIndexer<
|
|||
SamplingRng,
|
||||
>;
|
||||
|
||||
pub type DaVerifier<A, B, VB, SS> =
|
||||
DaVerifierService<VB, Libp2pAdapter<B, A>, VerifierStorageAdapter<A, B, SS>>;
|
||||
pub type DaVerifier<A, B, M, VB, SS> =
|
||||
DaVerifierService<VB, Libp2pAdapter<M>, VerifierStorageAdapter<A, B, SS>>;
|
||||
|
||||
pub async fn add_blob<A, B, VB, SS>(
|
||||
pub async fn add_blob<A, B, M, VB, SS>(
|
||||
handle: &OverwatchHandle,
|
||||
blob: B,
|
||||
) -> Result<Option<()>, DynError>
|
||||
|
@ -69,12 +72,21 @@ where
|
|||
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
|
||||
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
VB: VerifierBackend + CoreDaVerifier<DaBlob = B>,
|
||||
<VB as VerifierBackend>::Settings: Clone,
|
||||
<VB as CoreDaVerifier>::Error: Error,
|
||||
SS: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
let relay = handle.relay::<DaVerifier<A, B, VB, SS>>().connect().await?;
|
||||
let relay = handle
|
||||
.relay::<DaVerifier<A, B, M, VB, SS>>()
|
||||
.connect()
|
||||
.await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
relay
|
||||
.send(DaVerifierMsg::AddBlob {
|
||||
|
|
|
@ -28,12 +28,12 @@ overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806"
|
|||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
rand = "0.8"
|
||||
rand_chacha = "0.3"
|
||||
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
tokio-stream = "0.1.15"
|
||||
tempfile = "3.6"
|
||||
tracing = "0.1"
|
||||
time = "0.3"
|
||||
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
|
||||
|
||||
[dev-dependencies]
|
||||
blake2 = { version = "0.10" }
|
||||
|
|
|
@ -5,6 +5,7 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transa
|
|||
use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter;
|
||||
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter;
|
||||
use nomos_da_indexer::DataIndexerService;
|
||||
use nomos_da_sampling::DaSamplingService;
|
||||
use nomos_da_sampling::{
|
||||
backend::kzgrs::KzgrsSamplingBackend,
|
||||
network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter,
|
||||
|
@ -18,7 +19,7 @@ use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAda
|
|||
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
|
||||
use nomos_storage::backends::rocksdb::RocksBackend;
|
||||
use rand_chacha::ChaCha20Rng;
|
||||
use subnetworks_assignations::versions::v2::FillWithOriginalReplication;
|
||||
use subnetworks_assignations::versions::v1::FillFromNodeList;
|
||||
|
||||
pub use nomos_core::{
|
||||
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
|
||||
|
@ -26,6 +27,9 @@ pub use nomos_core::{
|
|||
use nomos_mempool::da::service::DaMempoolService;
|
||||
use nomos_node::{Tx, Wire};
|
||||
|
||||
/// Membership used by the DA Network service.
|
||||
pub type NomosDaMembership = FillFromNodeList;
|
||||
|
||||
pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
|
||||
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
|
||||
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
|
||||
|
@ -36,7 +40,13 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
|
|||
FillSizeWithBlobs<MB16, BlobInfo>,
|
||||
RocksBackend<Wire>,
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<FillWithOriginalReplication>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
>;
|
||||
|
||||
pub type DaSampling = DaSamplingService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
>;
|
||||
|
||||
|
@ -55,7 +65,7 @@ pub(crate) type DaIndexer = DataIndexerService<
|
|||
FillSizeWithBlobs<MB16, BlobInfo>,
|
||||
RocksBackend<Wire>,
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<FillWithOriginalReplication>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
>;
|
||||
|
||||
|
@ -71,7 +81,7 @@ pub(crate) type DaMempool = DaMempoolService<
|
|||
|
||||
pub(crate) type DaVerifier = DaVerifierService<
|
||||
KzgrsDaVerifier,
|
||||
Libp2pAdapter<DaBlob, ()>,
|
||||
Libp2pAdapter<NomosDaMembership>,
|
||||
VerifierStorageAdapter<(), DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
|
|
|
@ -14,9 +14,18 @@ use nomos_core::da::{blob::info::DispersedBlobInfo, DaEncoder as _};
|
|||
use nomos_core::tx::Transaction;
|
||||
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageSettings;
|
||||
use nomos_da_indexer::IndexerSettings;
|
||||
use nomos_da_network_service::backends::libp2p::validator::{
|
||||
DaNetworkValidatorBackend, DaNetworkValidatorBackendSettings,
|
||||
};
|
||||
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
|
||||
use nomos_da_network_service::NetworkService as DaNetworkService;
|
||||
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
|
||||
use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings;
|
||||
use nomos_da_sampling::DaSamplingServiceSettings;
|
||||
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
|
||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings;
|
||||
use nomos_da_verifier::DaVerifierServiceSettings;
|
||||
use nomos_libp2p::{ed25519, identity, PeerId};
|
||||
use nomos_libp2p::{Multiaddr, SwarmConfig};
|
||||
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
|
||||
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
|
||||
|
@ -28,6 +37,7 @@ use overwatch_derive::*;
|
|||
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
|
||||
use overwatch_rs::services::handle::ServiceHandle;
|
||||
use rand::{thread_rng, Rng, RngCore};
|
||||
use subnetworks_assignations::versions::v1::FillFromNodeList;
|
||||
use tempfile::{NamedTempFile, TempDir};
|
||||
use time::OffsetDateTime;
|
||||
// internal
|
||||
|
@ -43,12 +53,14 @@ struct ClientNode {
|
|||
#[derive(Services)]
|
||||
struct VerifierNode {
|
||||
network: ServiceHandle<NetworkService<NetworkBackend>>,
|
||||
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
|
||||
cl_mempool: ServiceHandle<TxMempool>,
|
||||
da_mempool: ServiceHandle<DaMempool>,
|
||||
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
|
||||
cryptarchia: ServiceHandle<Cryptarchia>,
|
||||
indexer: ServiceHandle<DaIndexer>,
|
||||
verifier: ServiceHandle<DaVerifier>,
|
||||
da_sampling: ServiceHandle<DaSampling>,
|
||||
}
|
||||
|
||||
// Client node is just an empty overwatch service to spawn a task that could communicate with other
|
||||
|
@ -87,6 +99,18 @@ fn new_node(
|
|||
initial_peers,
|
||||
},
|
||||
},
|
||||
da_network: DaNetworkConfig {
|
||||
backend: DaNetworkValidatorBackendSettings {
|
||||
node_key: ed25519::SecretKey::generate(),
|
||||
membership: FillFromNodeList::new(
|
||||
&[PeerId::from(identity::Keypair::generate_ed25519().public())],
|
||||
2,
|
||||
1,
|
||||
),
|
||||
addresses: Default::default(),
|
||||
listening_address: "/ip4/127.0.0.1/udp/0/quic-v1".parse::<Multiaddr>().unwrap(),
|
||||
},
|
||||
},
|
||||
cl_mempool: TxMempoolSettings {
|
||||
backend: (),
|
||||
network: AdapterSettings {
|
||||
|
@ -128,6 +152,19 @@ fn new_node(
|
|||
blob_storage_directory: blobs_dir.clone(),
|
||||
},
|
||||
},
|
||||
da_sampling: DaSamplingServiceSettings {
|
||||
// TODO: setup this properly!
|
||||
sampling_settings: KzgrsSamplingBackendSettings {
|
||||
num_samples: 0,
|
||||
// Sampling service period can't be zero.
|
||||
old_blobs_check_interval: Duration::from_secs(1),
|
||||
blobs_validity_duration: Duration::from_secs(1),
|
||||
},
|
||||
network_adapter_settings: DaNetworkSamplingSettings {
|
||||
num_samples: 0,
|
||||
subnet_size: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
None,
|
||||
)
|
||||
|
|
|
@ -10,17 +10,20 @@ bytes = "1.2"
|
|||
futures = "0.3"
|
||||
hex = "0.4.3"
|
||||
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
|
||||
libp2p = { version = "0.54", features = ["ed25519"] }
|
||||
nomos-core = { path = "../../../nomos-core" }
|
||||
nomos-da-storage = { path = "../../../nomos-da/storage" }
|
||||
nomos-network = { path = "../../../nomos-services/network" }
|
||||
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
||||
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
subnetworks-assignations = { path = "../../../nomos-da/network/subnetworks-assignations" }
|
||||
tokio = { version = "1", features = ["sync", "macros"] }
|
||||
tokio-stream = "0.1.15"
|
||||
tracing = "0.1"
|
||||
|
||||
[features]
|
||||
rocksdb-backend = ["nomos-storage/rocksdb-backend"]
|
||||
libp2p = ["nomos-network/libp2p"]
|
||||
libp2p = []
|
||||
|
|
|
@ -17,7 +17,7 @@ use tracing::{error, span, Instrument, Level};
|
|||
// internal
|
||||
use backend::VerifierBackend;
|
||||
use network::NetworkAdapter;
|
||||
use nomos_network::NetworkService;
|
||||
use nomos_da_network_service::NetworkService;
|
||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||
|
@ -66,7 +66,7 @@ where
|
|||
Backend::DaBlob: Debug + Send,
|
||||
Backend::Error: Error + Send + Sync,
|
||||
Backend::Settings: Clone,
|
||||
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + 'static,
|
||||
N: NetworkAdapter<Blob = Backend::DaBlob> + Send + 'static,
|
||||
N::Settings: Clone,
|
||||
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + 'static,
|
||||
{
|
||||
|
@ -124,7 +124,7 @@ where
|
|||
Backend::Settings: Clone + Send + Sync + 'static,
|
||||
Backend::DaBlob: Debug + Send + Sync + 'static,
|
||||
Backend::Error: Error + Send + Sync + 'static,
|
||||
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + Sync + 'static,
|
||||
N: NetworkAdapter<Blob = Backend::DaBlob> + Send + Sync + 'static,
|
||||
N::Settings: Clone + Send + Sync + 'static,
|
||||
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = ()> + Send + Sync + 'static,
|
||||
S::Settings: Clone + Send + Sync + 'static,
|
||||
|
@ -173,27 +173,24 @@ where
|
|||
loop {
|
||||
tokio::select! {
|
||||
Some(blob) = blob_stream.next() => {
|
||||
match Self::handle_new_blob(&verifier,&storage_adapter, &blob).await {
|
||||
Ok(attestation) => if let Err(err) = network_adapter.send_attestation(attestation).await {
|
||||
error!("Error replying attestation {err:?}");
|
||||
},
|
||||
Err(err) => error!("Error handling blob {blob:?} due to {err:?}"),
|
||||
if let Err(err) = Self::handle_new_blob(&verifier,&storage_adapter, &blob).await {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
}
|
||||
}
|
||||
Some(msg) = service_state.inbound_relay.recv() => {
|
||||
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
|
||||
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
|
||||
Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) {
|
||||
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
|
||||
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
|
||||
Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) {
|
||||
error!("Error replying attestation {err:?}");
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
if let Err(err) = reply_channel.send(None) {
|
||||
error!("Error replying attestation {err:?}");
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Error handling blob {blob:?} due to {err:?}");
|
||||
if let Err(err) = reply_channel.send(None) {
|
||||
error!("Error replying attestation {err:?}");
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
break;
|
||||
|
|
|
@ -1,107 +1,79 @@
|
|||
// std
|
||||
use futures::Stream;
|
||||
use overwatch_rs::DynError;
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
// crates
|
||||
|
||||
// internal
|
||||
use crate::network::NetworkAdapter;
|
||||
use nomos_core::wire;
|
||||
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
|
||||
use nomos_network::{NetworkMsg, NetworkService};
|
||||
use futures::Stream;
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use libp2p::PeerId;
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_da_network_service::backends::libp2p::validator::{
|
||||
DaNetworkEvent, DaNetworkEventKind, DaNetworkValidatorBackend,
|
||||
};
|
||||
use nomos_da_network_service::NetworkService;
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::debug;
|
||||
// internal
|
||||
use crate::network::NetworkAdapter;
|
||||
|
||||
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
|
||||
|
||||
pub struct Libp2pAdapter<B, A> {
|
||||
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||
_blob: PhantomData<B>,
|
||||
_attestation: PhantomData<A>,
|
||||
}
|
||||
|
||||
impl<B, A> Libp2pAdapter<B, A>
|
||||
pub struct Libp2pAdapter<M>
|
||||
where
|
||||
B: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
A: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
async fn stream_for<E: DeserializeOwned>(&self) -> Box<dyn Stream<Item = E> + Unpin + Send> {
|
||||
let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC);
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
self.network_relay
|
||||
.send(NetworkMsg::Subscribe {
|
||||
kind: EventKind::Message,
|
||||
sender,
|
||||
})
|
||||
.await
|
||||
.expect("Network backend should be ready");
|
||||
let receiver = receiver.await.unwrap();
|
||||
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
|
||||
move |msg| match msg {
|
||||
Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => {
|
||||
match wire::deserialize::<E>(&data) {
|
||||
Ok(msg) => Some(msg),
|
||||
Err(e) => {
|
||||
debug!("Unrecognized message: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
)))
|
||||
}
|
||||
|
||||
async fn send<E: Serialize>(&self, data: E) -> Result<(), DynError> {
|
||||
let message = wire::serialize(&data)?.into_boxed_slice();
|
||||
self.network_relay
|
||||
.send(NetworkMsg::Process(Command::Broadcast {
|
||||
topic: NOMOS_DA_TOPIC.to_string(),
|
||||
message,
|
||||
}))
|
||||
.await
|
||||
.map_err(|(e, _)| Box::new(e) as DynError)
|
||||
}
|
||||
network_relay:
|
||||
OutboundRelay<<NetworkService<DaNetworkValidatorBackend<M>> as ServiceData>::Message>,
|
||||
_membership: PhantomData<M>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<B, A> NetworkAdapter for Libp2pAdapter<B, A>
|
||||
impl<M> NetworkAdapter for Libp2pAdapter<M>
|
||||
where
|
||||
B: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
A: Serialize + DeserializeOwned + Send + Sync + 'static,
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
+ Clone
|
||||
+ Debug
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
{
|
||||
type Backend = Libp2p;
|
||||
type Backend = DaNetworkValidatorBackend<M>;
|
||||
type Settings = ();
|
||||
type Blob = B;
|
||||
type Attestation = A;
|
||||
type Blob = DaBlob;
|
||||
|
||||
async fn new(
|
||||
_settings: Self::Settings,
|
||||
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self {
|
||||
network_relay
|
||||
.send(NetworkMsg::Process(Command::Subscribe(
|
||||
NOMOS_DA_TOPIC.to_string(),
|
||||
)))
|
||||
.await
|
||||
.expect("Network backend should be ready");
|
||||
Self {
|
||||
network_relay,
|
||||
_blob: Default::default(),
|
||||
_attestation: Default::default(),
|
||||
_membership: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send> {
|
||||
self.stream_for::<Self::Blob>().await
|
||||
}
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
self.network_relay
|
||||
.send(nomos_da_network_service::DaNetworkMsg::Subscribe {
|
||||
kind: DaNetworkEventKind::Verifying,
|
||||
sender,
|
||||
})
|
||||
.await
|
||||
.expect("Network backend should be ready");
|
||||
|
||||
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> {
|
||||
self.send(attestation).await
|
||||
let receiver = receiver.await.expect("Blob stream should be received");
|
||||
|
||||
let stream = receiver.filter_map(move |msg| match msg {
|
||||
DaNetworkEvent::Verifying(blob) => Some(*blob),
|
||||
_ => None,
|
||||
});
|
||||
|
||||
Box::new(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,19 +1,17 @@
|
|||
pub mod adapters;
|
||||
|
||||
use futures::Stream;
|
||||
use nomos_network::backends::NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use nomos_da_network_service::backends::NetworkBackend;
|
||||
use nomos_da_network_service::NetworkService;
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
use overwatch_rs::DynError;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait NetworkAdapter {
|
||||
type Backend: NetworkBackend + 'static;
|
||||
type Backend: NetworkBackend + Send + 'static;
|
||||
type Settings;
|
||||
|
||||
type Blob;
|
||||
type Attestation;
|
||||
|
||||
async fn new(
|
||||
settings: Self::Settings,
|
||||
|
@ -21,5 +19,4 @@ pub trait NetworkAdapter {
|
|||
) -> Self;
|
||||
|
||||
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
|
||||
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue