DA: Incoming sampling request handling (#730)
* WIP: Sample message to behaviour massage translation * Sampling request handling * Sampling storage in node * Sampling rocksdb scaffold
This commit is contained in:
parent
b31e867f7f
commit
f8a73529cf
@ -23,7 +23,7 @@ overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d
|
||||
tracing = "0.1"
|
||||
multiaddr = "0.18"
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling" }
|
||||
nomos-da-sampling = { path = "../../nomos-services/data-availability/sampling", features = ["rocksdb-backend"] }
|
||||
nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", features = ["rocksdb-backend", "libp2p"] }
|
||||
nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] }
|
||||
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
|
||||
|
@ -64,6 +64,7 @@ pub struct AxumBackend<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> {
|
||||
settings: AxumBackendSettings,
|
||||
@ -78,6 +79,7 @@ pub struct AxumBackend<
|
||||
_sampling_backend: core::marker::PhantomData<SamplingBackend>,
|
||||
_sampling_network_adapter: core::marker::PhantomData<SamplingNetworkAdapter>,
|
||||
_sampling_rng: core::marker::PhantomData<SamplingRng>,
|
||||
_sampling_storage: core::marker::PhantomData<SamplingStorage>,
|
||||
}
|
||||
|
||||
#[derive(OpenApi)]
|
||||
@ -106,6 +108,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> Backend
|
||||
for AxumBackend<
|
||||
@ -120,6 +123,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>
|
||||
where
|
||||
@ -182,6 +186,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter + Send + 'static,
|
||||
{
|
||||
type Error = hyper::Error;
|
||||
type Settings = AxumBackendSettings;
|
||||
@ -203,6 +208,7 @@ where
|
||||
_sampling_backend: core::marker::PhantomData,
|
||||
_sampling_network_adapter: core::marker::PhantomData,
|
||||
_sampling_rng: core::marker::PhantomData,
|
||||
_sampling_storage: core::marker::PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
@ -240,6 +246,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
@ -253,6 +260,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
@ -269,6 +277,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>,
|
||||
),
|
||||
@ -279,7 +288,13 @@ where
|
||||
.route(
|
||||
"/mempool/add/blobinfo",
|
||||
routing::post(
|
||||
add_blob_info::<V, SamplingBackend, SamplingNetworkAdapter, SamplingRng>,
|
||||
add_blob_info::<
|
||||
V,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
),
|
||||
)
|
||||
.route("/metrics", routing::get(get_metrics))
|
||||
@ -370,6 +385,7 @@ async fn cryptarchia_info<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
@ -393,6 +409,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
make_request_and_return_response!(consensus::cryptarchia_info::<
|
||||
Tx,
|
||||
@ -400,6 +417,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>(&handle))
|
||||
}
|
||||
@ -418,6 +436,7 @@ async fn cryptarchia_headers<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
>(
|
||||
State(store): State<OverwatchHandle>,
|
||||
@ -442,6 +461,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
let QueryParams { from, to } = query;
|
||||
make_request_and_return_response!(consensus::cryptarchia_headers::<
|
||||
@ -450,6 +470,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>(&store, from, to))
|
||||
}
|
||||
@ -511,6 +532,7 @@ async fn get_range<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
@ -561,6 +583,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
make_request_and_return_response!(da::get_range::<
|
||||
Tx,
|
||||
@ -570,6 +593,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>(&handle, app_id, range))
|
||||
}
|
||||
@ -631,7 +655,7 @@ where
|
||||
(status = 500, description = "Internal server error", body = String),
|
||||
)
|
||||
)]
|
||||
async fn add_blob_info<B, SamplingBackend, SamplingAdapter, SamplingRng>(
|
||||
async fn add_blob_info<B, SamplingBackend, SamplingAdapter, SamplingRng, SamplingStorage>(
|
||||
State(handle): State<OverwatchHandle>,
|
||||
Json(blob_info): Json<B>,
|
||||
) -> Response
|
||||
@ -654,6 +678,7 @@ where
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingAdapter: nomos_da_sampling::network::NetworkAdapter + Send + 'static,
|
||||
SamplingRng: SeedableRng + RngCore + Send + 'static,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
make_request_and_return_response!(mempool::add_blob_info::<
|
||||
NetworkBackend,
|
||||
@ -663,6 +688,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>(&handle, blob_info, DispersedBlobInfo::blob_id))
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,7 @@ use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBac
|
||||
use nomos_da_network_service::NetworkService as DaNetworkService;
|
||||
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackend;
|
||||
use nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter as SamplingLibp2pAdapter;
|
||||
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
|
||||
use nomos_da_sampling::DaSamplingService;
|
||||
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
|
||||
use nomos_da_verifier::network::adapters::libp2p::Libp2pAdapter as VerifierNetworkAdapter;
|
||||
@ -66,6 +67,7 @@ pub type NomosApiService = ApiService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
MB16,
|
||||
>,
|
||||
>;
|
||||
@ -86,6 +88,7 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub type TxMempool = TxMempoolService<
|
||||
@ -99,6 +102,7 @@ pub type DaMempool = DaMempoolService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub type DaIndexer = DataIndexerService<
|
||||
@ -118,12 +122,14 @@ pub type DaIndexer = DataIndexerService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub type DaSampling = DaSamplingService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub type DaVerifier = DaVerifierService<
|
||||
|
@ -17,8 +17,10 @@ use libp2p::swarm::{
|
||||
};
|
||||
use libp2p::{Multiaddr, PeerId, Stream};
|
||||
use libp2p_stream::{Control, IncomingStreams, OpenStreamError};
|
||||
use nomos_core::da::BlobId;
|
||||
use nomos_da_messages::sampling::sample_err::SampleErrType;
|
||||
use nomos_da_messages::sampling::{sample_res, SampleErr, SampleReq, SampleRes};
|
||||
use nomos_da_messages::{pack_message, unpack_from_reader};
|
||||
use nomos_da_messages::{common, pack_message, unpack_from_reader};
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc;
|
||||
@ -57,7 +59,14 @@ pub enum SamplingError {
|
||||
error: bincode::Error,
|
||||
},
|
||||
#[error("Error sending request: {request:?}")]
|
||||
RequestChannel { request: SampleReq, peer_id: PeerId },
|
||||
RequestChannel {
|
||||
request: BehaviourSampleReq,
|
||||
peer_id: PeerId,
|
||||
},
|
||||
#[error("Malformed blob id: {blob_id:?}")]
|
||||
InvalidBlobId { peer_id: PeerId, blob_id: Vec<u8> },
|
||||
#[error("Blob not found: {blob_id:?}")]
|
||||
BlobNotFound { peer_id: PeerId, blob_id: Vec<u8> },
|
||||
#[error("Canceled response: {error}")]
|
||||
ResponseChannel { error: Canceled, peer_id: PeerId },
|
||||
}
|
||||
@ -71,6 +80,8 @@ impl SamplingError {
|
||||
SamplingError::Deserialize { peer_id, .. } => peer_id,
|
||||
SamplingError::RequestChannel { peer_id, .. } => peer_id,
|
||||
SamplingError::ResponseChannel { peer_id, .. } => peer_id,
|
||||
SamplingError::InvalidBlobId { peer_id, .. } => peer_id,
|
||||
SamplingError::BlobNotFound { peer_id, .. } => peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
@ -132,13 +143,64 @@ impl Clone for SamplingError {
|
||||
peer_id: *peer_id,
|
||||
error: *error,
|
||||
},
|
||||
SamplingError::InvalidBlobId { blob_id, peer_id } => SamplingError::InvalidBlobId {
|
||||
peer_id: *peer_id,
|
||||
blob_id: blob_id.clone(),
|
||||
},
|
||||
SamplingError::BlobNotFound { blob_id, peer_id } => SamplingError::BlobNotFound {
|
||||
peer_id: *peer_id,
|
||||
blob_id: blob_id.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner type representation of a Blob ID
|
||||
// TODO: Use a proper type that is common to the codebase
|
||||
type BlobId = [u8; 32];
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BehaviourSampleReq {
|
||||
pub blob_id: BlobId,
|
||||
}
|
||||
|
||||
impl TryFrom<SampleReq> for BehaviourSampleReq {
|
||||
type Error = Vec<u8>;
|
||||
|
||||
fn try_from(req: SampleReq) -> Result<Self, Self::Error> {
|
||||
let blob_id: BlobId = req.blob_id.try_into()?;
|
||||
Ok(Self { blob_id })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum BehaviourSampleRes {
|
||||
SamplingSuccess {
|
||||
blob_id: BlobId,
|
||||
subnetwork_id: SubnetworkId,
|
||||
blob: Box<DaBlob>,
|
||||
},
|
||||
SampleNotFound {
|
||||
blob_id: BlobId,
|
||||
},
|
||||
}
|
||||
|
||||
impl From<BehaviourSampleRes> for SampleRes {
|
||||
fn from(res: BehaviourSampleRes) -> Self {
|
||||
match res {
|
||||
BehaviourSampleRes::SamplingSuccess { blob, blob_id, .. } => SampleRes {
|
||||
message_type: Some(sample_res::MessageType::Blob(common::Blob {
|
||||
blob_id: blob_id.to_vec(),
|
||||
data: bincode::serialize(&blob)
|
||||
.expect("Blob from service should be serializable"),
|
||||
})),
|
||||
},
|
||||
BehaviourSampleRes::SampleNotFound { blob_id } => SampleRes {
|
||||
message_type: Some(sample_res::MessageType::Err(SampleErr {
|
||||
blob_id: blob_id.into(),
|
||||
err_type: SampleErrType::NotFound.into(),
|
||||
err_description: "Sample not found".to_string(),
|
||||
})),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum SamplingEvent {
|
||||
@ -149,8 +211,8 @@ pub enum SamplingEvent {
|
||||
blob: Box<DaBlob>,
|
||||
},
|
||||
IncomingSample {
|
||||
request_receiver: Receiver<SampleReq>,
|
||||
response_sender: Sender<SampleRes>,
|
||||
request_receiver: Receiver<BehaviourSampleReq>,
|
||||
response_sender: Sender<BehaviourSampleRes>,
|
||||
},
|
||||
SamplingError {
|
||||
error: SamplingError,
|
||||
@ -165,8 +227,8 @@ struct SampleStream {
|
||||
|
||||
/// Auxiliary struct that binds where to send a request and the pair channel to listen for a response
|
||||
struct ResponseChannel {
|
||||
request_sender: Sender<SampleReq>,
|
||||
response_receiver: Receiver<SampleRes>,
|
||||
request_sender: Sender<BehaviourSampleReq>,
|
||||
response_receiver: Receiver<BehaviourSampleRes>,
|
||||
}
|
||||
|
||||
type StreamHandlerFutureSuccess = (BlobId, SubnetworkId, SampleRes, SampleStream);
|
||||
@ -350,6 +412,12 @@ where
|
||||
peer_id: stream.peer_id,
|
||||
error,
|
||||
})?;
|
||||
let request = BehaviourSampleReq::try_from(request).map_err(|blob_id| {
|
||||
SamplingError::InvalidBlobId {
|
||||
peer_id: stream.peer_id,
|
||||
blob_id,
|
||||
}
|
||||
})?;
|
||||
channel
|
||||
.request_sender
|
||||
.send(request)
|
||||
@ -357,14 +425,14 @@ where
|
||||
request,
|
||||
peer_id: stream.peer_id,
|
||||
})?;
|
||||
let response =
|
||||
channel
|
||||
.response_receiver
|
||||
.await
|
||||
.map_err(|error| SamplingError::ResponseChannel {
|
||||
error,
|
||||
peer_id: stream.peer_id,
|
||||
})?;
|
||||
let response: SampleRes = channel
|
||||
.response_receiver
|
||||
.await
|
||||
.map_err(|error| SamplingError::ResponseChannel {
|
||||
error,
|
||||
peer_id: stream.peer_id,
|
||||
})?
|
||||
.into();
|
||||
let bytes = pack_message(&response).map_err(|error| SamplingError::Io {
|
||||
peer_id: stream.peer_id,
|
||||
error,
|
||||
@ -394,7 +462,7 @@ where
|
||||
fn schedule_incoming_stream_task(
|
||||
incoming_tasks: &mut FuturesUnordered<IncomingStreamHandlerFuture>,
|
||||
sample_stream: SampleStream,
|
||||
) -> (Receiver<SampleReq>, Sender<SampleRes>) {
|
||||
) -> (Receiver<BehaviourSampleReq>, Sender<BehaviourSampleRes>) {
|
||||
let (request_sender, request_receiver) = oneshot::channel();
|
||||
let (response_sender, response_receiver) = oneshot::channel();
|
||||
let channel = ResponseChannel {
|
||||
|
@ -3,7 +3,9 @@ pub mod behaviour;
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::address_book::AddressBook;
|
||||
use crate::protocols::sampling::behaviour::{SamplingBehaviour, SamplingEvent};
|
||||
use crate::protocols::sampling::behaviour::{
|
||||
BehaviourSampleRes, SamplingBehaviour, SamplingEvent,
|
||||
};
|
||||
use crate::test_utils::AllNeighbours;
|
||||
use crate::SubnetworkId;
|
||||
use futures::StreamExt;
|
||||
@ -13,8 +15,6 @@ mod test {
|
||||
use libp2p::swarm::SwarmEvent;
|
||||
use libp2p::{quic, Multiaddr, PeerId, Swarm, SwarmBuilder};
|
||||
use log::debug;
|
||||
use nomos_da_messages::common::Blob;
|
||||
use nomos_da_messages::sampling::SampleRes;
|
||||
use std::time::Duration;
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tracing_subscriber::fmt::TestWriter;
|
||||
@ -103,24 +103,18 @@ mod test {
|
||||
// spawn here because otherwise we block polling
|
||||
tokio::spawn(request_receiver);
|
||||
response_sender
|
||||
.send(SampleRes {
|
||||
message_type: Some(
|
||||
nomos_da_messages::sampling::sample_res::MessageType::Blob(
|
||||
Blob {
|
||||
blob_id: vec![],
|
||||
data: bincode::serialize(&DaBlob {
|
||||
column_idx: 0,
|
||||
column: Column(vec![]),
|
||||
column_commitment: Default::default(),
|
||||
aggregated_column_commitment: Default::default(),
|
||||
aggregated_column_proof: Default::default(),
|
||||
rows_commitments: vec![],
|
||||
rows_proofs: vec![],
|
||||
})
|
||||
.unwrap(),
|
||||
},
|
||||
),
|
||||
),
|
||||
.send(BehaviourSampleRes::SamplingSuccess {
|
||||
blob_id: Default::default(),
|
||||
subnetwork_id: Default::default(),
|
||||
blob: Box::new(DaBlob {
|
||||
column: Column(vec![]),
|
||||
column_idx: 0,
|
||||
column_commitment: Default::default(),
|
||||
aggregated_column_commitment: Default::default(),
|
||||
aggregated_column_proof: Default::default(),
|
||||
rows_commitments: vec![],
|
||||
rows_proofs: vec![],
|
||||
}),
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ pub type Cryptarchia<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> = CryptarchiaConsensus<
|
||||
ConsensusNetworkAdapter<Tx, BlobInfo>,
|
||||
@ -44,6 +45,7 @@ pub type Cryptarchia<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>;
|
||||
|
||||
pub async fn cryptarchia_info<
|
||||
@ -52,6 +54,7 @@ pub async fn cryptarchia_info<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
>(
|
||||
handle: &OverwatchHandle,
|
||||
@ -75,9 +78,18 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
let relay = handle
|
||||
.relay::<Cryptarchia<Tx, SS, SamplingBackend, SamplingNetworkAdapter, SamplingRng, SIZE>>()
|
||||
.relay::<Cryptarchia<
|
||||
Tx,
|
||||
SS,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>>()
|
||||
.connect()
|
||||
.await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
@ -95,6 +107,7 @@ pub async fn cryptarchia_headers<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
>(
|
||||
handle: &OverwatchHandle,
|
||||
@ -120,9 +133,18 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
let relay = handle
|
||||
.relay::<Cryptarchia<Tx, SS, SamplingBackend, SamplingNetworkAdapter, SamplingRng, SIZE>>()
|
||||
.relay::<Cryptarchia<
|
||||
Tx,
|
||||
SS,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>>()
|
||||
.connect()
|
||||
.await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
@ -40,6 +40,7 @@ pub type DaIndexer<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
> = DataIndexerService<
|
||||
// Indexer specific.
|
||||
@ -58,6 +59,7 @@ pub type DaIndexer<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>;
|
||||
|
||||
pub type DaVerifier<A, B, M, VB, SS> =
|
||||
@ -107,6 +109,7 @@ pub async fn get_range<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
const SIZE: usize,
|
||||
>(
|
||||
handle: &OverwatchHandle,
|
||||
@ -157,9 +160,20 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
let relay = handle
|
||||
.relay::<DaIndexer<Tx, C, V, SS, SamplingBackend, SamplingNetworkAdapter, SamplingRng, SIZE>>()
|
||||
.relay::<DaIndexer<
|
||||
Tx,
|
||||
C,
|
||||
V,
|
||||
SS,
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
SIZE,
|
||||
>>()
|
||||
.connect()
|
||||
.await?;
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
@ -45,7 +45,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_blob_info<N, A, Item, Key, SamplingBackend, SamplingAdapter, SamplingRng>(
|
||||
pub async fn add_blob_info<
|
||||
N,
|
||||
A,
|
||||
Item,
|
||||
Key,
|
||||
SamplingBackend,
|
||||
SamplingAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
item: A::Payload,
|
||||
converter: impl Fn(&A::Payload) -> Key,
|
||||
@ -63,6 +72,7 @@ where
|
||||
SamplingBackend::Settings: Clone,
|
||||
SamplingAdapter: DaSamplingNetworkAdapter + Send,
|
||||
SamplingRng: SeedableRng + RngCore + Send,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
let relay = handle
|
||||
.relay::<DaMempoolService<
|
||||
@ -71,6 +81,7 @@ where
|
||||
SamplingBackend,
|
||||
SamplingAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>>()
|
||||
.connect()
|
||||
.await?;
|
||||
|
@ -152,6 +152,7 @@ pub struct CryptarchiaConsensus<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> where
|
||||
A: NetworkAdapter,
|
||||
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
|
||||
@ -173,6 +174,7 @@ pub struct CryptarchiaConsensus<
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
// underlying networking backend. We need this so we can relay and check the types properly
|
||||
@ -186,9 +188,12 @@ pub struct CryptarchiaConsensus<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
>,
|
||||
sampling_relay: Relay<DaSamplingService<SamplingBackend, SamplingNetworkAdapter, SamplingRng>>,
|
||||
sampling_relay: Relay<
|
||||
DaSamplingService<SamplingBackend, SamplingNetworkAdapter, SamplingRng, SamplingStorage>,
|
||||
>,
|
||||
block_subscription_sender: broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
|
||||
storage_relay: Relay<StorageService<Storage>>,
|
||||
}
|
||||
@ -205,6 +210,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> ServiceData
|
||||
for CryptarchiaConsensus<
|
||||
A,
|
||||
@ -218,6 +224,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
where
|
||||
A: NetworkAdapter,
|
||||
@ -239,6 +246,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = CRYPTARCHIA_ID;
|
||||
type Settings = CryptarchiaSettings<TxS::Settings, BS::Settings>;
|
||||
@ -260,6 +268,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> ServiceCore
|
||||
for CryptarchiaConsensus<
|
||||
A,
|
||||
@ -273,6 +282,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
where
|
||||
A: NetworkAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
|
||||
@ -323,6 +333,7 @@ where
|
||||
SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static,
|
||||
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let network_relay = service_state.overwatch_handle.relay();
|
||||
@ -481,6 +492,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
CryptarchiaConsensus<
|
||||
A,
|
||||
@ -494,6 +506,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
where
|
||||
A: NetworkAdapter + Clone + Send + Sync + 'static,
|
||||
@ -536,6 +549,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + Ord + Send + Sync + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||
match message {
|
||||
|
@ -42,6 +42,7 @@ pub type ConsensusRelay<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> = Relay<
|
||||
CryptarchiaConsensus<
|
||||
A,
|
||||
@ -55,6 +56,7 @@ pub type ConsensusRelay<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
>;
|
||||
|
||||
@ -73,6 +75,7 @@ pub struct DataIndexerService<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> where
|
||||
B: 'static,
|
||||
A: NetworkAdapter,
|
||||
@ -96,6 +99,7 @@ pub struct DataIndexerService<
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
storage_relay: Relay<StorageService<DaStorage::Backend>>,
|
||||
@ -111,6 +115,7 @@ pub struct DataIndexerService<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>,
|
||||
}
|
||||
|
||||
@ -155,6 +160,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> ServiceData
|
||||
for DataIndexerService<
|
||||
B,
|
||||
@ -171,6 +177,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
where
|
||||
B: 'static,
|
||||
@ -195,6 +202,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = "DaIndexer";
|
||||
type Settings = IndexerSettings<DaStorage::Settings>;
|
||||
@ -218,6 +226,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
DataIndexerService<
|
||||
B,
|
||||
@ -234,6 +243,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
where
|
||||
B: Send + Sync + 'static,
|
||||
@ -259,6 +269,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
async fn handle_new_block(
|
||||
storage_adapter: &DaStorage,
|
||||
@ -323,6 +334,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
> ServiceCore
|
||||
for DataIndexerService<
|
||||
B,
|
||||
@ -339,6 +351,7 @@ impl<
|
||||
SamplingBackend,
|
||||
SamplingNetworkAdapter,
|
||||
SamplingRng,
|
||||
SamplingStorage,
|
||||
>
|
||||
where
|
||||
B: Debug + Send + Sync,
|
||||
@ -387,6 +400,7 @@ where
|
||||
SamplingBackend::Blob: Debug + 'static,
|
||||
SamplingBackend::BlobId: Debug + 'static,
|
||||
SamplingNetworkAdapter: nomos_da_sampling::network::NetworkAdapter,
|
||||
SamplingStorage: nomos_da_sampling::storage::DaStorageAdapter,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||
let consensus_relay = service_state.overwatch_handle.relay();
|
||||
|
@ -6,7 +6,9 @@ use libp2p::{Multiaddr, PeerId};
|
||||
use log::error;
|
||||
use nomos_core::da::BlobId;
|
||||
use nomos_da_network_core::protocols::sampling;
|
||||
use nomos_da_network_core::protocols::sampling::behaviour::SamplingError;
|
||||
use nomos_da_network_core::protocols::sampling::behaviour::{
|
||||
BehaviourSampleReq, BehaviourSampleRes, SamplingError,
|
||||
};
|
||||
use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm};
|
||||
use nomos_da_network_core::SubnetworkId;
|
||||
use nomos_libp2p::secret_key_serde;
|
||||
@ -17,9 +19,9 @@ use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use subnetworks_assignations::MembershipHandler;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
@ -49,6 +51,11 @@ pub enum DaNetworkEventKind {
|
||||
pub enum SamplingEvent {
|
||||
/// A success sampling
|
||||
SamplingSuccess { blob_id: BlobId, blob: Box<DaBlob> },
|
||||
/// Incoming sampling request
|
||||
SamplingRequest {
|
||||
blob_id: BlobId,
|
||||
response_sender: mpsc::Sender<Option<DaBlob>>,
|
||||
},
|
||||
/// A failed sampling error
|
||||
SamplingError { error: SamplingError },
|
||||
}
|
||||
@ -214,8 +221,37 @@ async fn handle_validator_events_stream(
|
||||
error!("Error in internal broadcast of sampling success: {e:?}");
|
||||
}
|
||||
}
|
||||
sampling::behaviour::SamplingEvent::IncomingSample{ .. } => {
|
||||
unimplemented!("Handle request/response from Sampling service");
|
||||
sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => {
|
||||
if let Ok(BehaviourSampleReq { blob_id }) = request_receiver.await {
|
||||
let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1);
|
||||
|
||||
if let Err(e) = sampling_broadcast_sender
|
||||
.send(SamplingEvent::SamplingRequest { blob_id, response_sender: sampling_response_sender })
|
||||
{
|
||||
error!("Error in internal broadcast of sampling request: {e:?}");
|
||||
sampling_response_receiver.close()
|
||||
}
|
||||
|
||||
if let Some(maybe_blob) = sampling_response_receiver.recv().await {
|
||||
let result = match maybe_blob {
|
||||
Some(blob) => BehaviourSampleRes::SamplingSuccess {
|
||||
blob_id,
|
||||
subnetwork_id: blob.column_idx as u32,
|
||||
blob: Box::new(blob),
|
||||
},
|
||||
None => BehaviourSampleRes::SampleNotFound { blob_id },
|
||||
};
|
||||
|
||||
if response_sender.send(result).is_err() {
|
||||
error!("Error sending sampling success response");
|
||||
}
|
||||
} else if response_sender
|
||||
.send(BehaviourSampleRes::SampleNotFound { blob_id })
|
||||
.is_err()
|
||||
{
|
||||
error!("Error sending sampling success response");
|
||||
}
|
||||
}
|
||||
}
|
||||
sampling::behaviour::SamplingEvent::SamplingError{ error } => {
|
||||
if let Err(e) = sampling_broadcast_sender.send(SamplingEvent::SamplingError {error}) {
|
||||
|
@ -13,6 +13,7 @@ libp2p-identity = { version = "0.2" }
|
||||
nomos-core = { path = "../../../nomos-core" }
|
||||
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
||||
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||
nomos-da-storage = { path = "../../../nomos-da/storage" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
@ -28,3 +29,4 @@ rand_chacha = "0.3.1"
|
||||
[features]
|
||||
default = ["libp2p"]
|
||||
libp2p = []
|
||||
rocksdb-backend = ["nomos-storage/rocksdb-backend"]
|
||||
|
@ -1,29 +1,31 @@
|
||||
pub mod backend;
|
||||
pub mod network;
|
||||
pub mod storage;
|
||||
|
||||
// std
|
||||
use std::collections::BTreeSet;
|
||||
use std::fmt::Debug;
|
||||
|
||||
// crates
|
||||
use rand::prelude::*;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{error, span, Instrument, Level};
|
||||
// internal
|
||||
use backend::{DaSamplingServiceBackend, SamplingState};
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use network::NetworkAdapter;
|
||||
use nomos_core::da::BlobId;
|
||||
use nomos_da_network_service::backends::libp2p::validator::SamplingEvent;
|
||||
use nomos_da_network_service::NetworkService;
|
||||
use nomos_storage::StorageService;
|
||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||
use overwatch_rs::DynError;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{error, span, Instrument, Level};
|
||||
// internal
|
||||
use backend::{DaSamplingServiceBackend, SamplingState};
|
||||
use storage::DaStorageAdapter;
|
||||
|
||||
const DA_SAMPLING_TAG: ServiceId = "DA-Sampling";
|
||||
|
||||
@ -41,35 +43,40 @@ pub enum DaSamplingServiceMsg<BlobId> {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings> {
|
||||
pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings, StorageSettings> {
|
||||
pub sampling_settings: BackendSettings,
|
||||
pub network_adapter_settings: NetworkSettings,
|
||||
pub storage_adapter_settings: StorageSettings,
|
||||
}
|
||||
|
||||
impl<B: 'static> RelayMessage for DaSamplingServiceMsg<B> {}
|
||||
|
||||
pub struct DaSamplingService<Backend, N, R>
|
||||
pub struct DaSamplingService<Backend, DaNetwork, SamplingRng, DaStorage>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R> + Send,
|
||||
SamplingRng: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<SamplingRng> + Send,
|
||||
Backend::Settings: Clone,
|
||||
Backend::Blob: Debug + 'static,
|
||||
Backend::BlobId: Debug + 'static,
|
||||
N: NetworkAdapter,
|
||||
N::Settings: Clone,
|
||||
DaNetwork: NetworkAdapter,
|
||||
DaNetwork::Settings: Clone,
|
||||
DaStorage: DaStorageAdapter,
|
||||
{
|
||||
network_relay: Relay<NetworkService<N::Backend>>,
|
||||
network_relay: Relay<NetworkService<DaNetwork::Backend>>,
|
||||
storage_relay: Relay<StorageService<DaStorage::Backend>>,
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
sampler: Backend,
|
||||
}
|
||||
|
||||
impl<Backend, N, R> DaSamplingService<Backend, N, R>
|
||||
impl<Backend, DaNetwork, SamplingRng, DaStorage>
|
||||
DaSamplingService<Backend, DaNetwork, SamplingRng, DaStorage>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R, BlobId = BlobId, Blob = DaBlob> + Send + 'static,
|
||||
SamplingRng: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<SamplingRng, BlobId = BlobId, Blob = DaBlob> + Send + 'static,
|
||||
Backend::Settings: Clone,
|
||||
N: NetworkAdapter + Send + 'static,
|
||||
N::Settings: Clone,
|
||||
DaNetwork: NetworkAdapter + Send + 'static,
|
||||
DaNetwork::Settings: Clone,
|
||||
DaStorage: DaStorageAdapter<Blob = DaBlob>,
|
||||
{
|
||||
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||
match message {
|
||||
@ -88,7 +95,7 @@ where
|
||||
|
||||
async fn handle_service_message(
|
||||
msg: <Self as ServiceData>::Message,
|
||||
network_adapter: &mut N,
|
||||
network_adapter: &mut DaNetwork,
|
||||
sampler: &mut Backend,
|
||||
) {
|
||||
match msg {
|
||||
@ -117,7 +124,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_sampling_message(event: SamplingEvent, sampler: &mut Backend) {
|
||||
async fn handle_sampling_message(
|
||||
event: SamplingEvent,
|
||||
sampler: &mut Backend,
|
||||
storage_adapter: &DaStorage,
|
||||
) {
|
||||
match event {
|
||||
SamplingEvent::SamplingSuccess { blob_id, blob } => {
|
||||
sampler.handle_sampling_success(blob_id, *blob).await;
|
||||
@ -129,35 +140,59 @@ where
|
||||
}
|
||||
error!("Error while sampling: {error}");
|
||||
}
|
||||
SamplingEvent::SamplingRequest {
|
||||
blob_id,
|
||||
response_sender,
|
||||
} => match storage_adapter.get_blob(blob_id).await {
|
||||
Ok(maybe_blob) => {
|
||||
if response_sender.send(maybe_blob).await.is_err() {
|
||||
error!("Error sending sampling response");
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
error!("Failed to get blob from storage adapter: {error}");
|
||||
if response_sender.send(None).await.is_err() {
|
||||
error!("Error sending sampling response");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Backend, N, R> ServiceData for DaSamplingService<Backend, N, R>
|
||||
impl<Backend, DaNetwork, SamplingRng, DaStorage> ServiceData
|
||||
for DaSamplingService<Backend, DaNetwork, SamplingRng, DaStorage>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R> + Send,
|
||||
SamplingRng: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<SamplingRng> + Send,
|
||||
Backend::Settings: Clone,
|
||||
Backend::Blob: Debug + 'static,
|
||||
Backend::BlobId: Debug + 'static,
|
||||
N: NetworkAdapter,
|
||||
N::Settings: Clone,
|
||||
DaNetwork: NetworkAdapter,
|
||||
DaNetwork::Settings: Clone,
|
||||
DaStorage: DaStorageAdapter,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = DA_SAMPLING_TAG;
|
||||
type Settings = DaSamplingServiceSettings<Backend::Settings, N::Settings>;
|
||||
type Settings =
|
||||
DaSamplingServiceSettings<Backend::Settings, DaNetwork::Settings, DaStorage::Settings>;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = DaSamplingServiceMsg<Backend::BlobId>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Backend, N, R> ServiceCore for DaSamplingService<Backend, N, R>
|
||||
impl<Backend, DaNetwork, SamplingRng, DaStorage> ServiceCore
|
||||
for DaSamplingService<Backend, DaNetwork, SamplingRng, DaStorage>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R, BlobId = BlobId, Blob = DaBlob> + Send + Sync + 'static,
|
||||
SamplingRng: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<SamplingRng, BlobId = BlobId, Blob = DaBlob>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
Backend::Settings: Clone + Send + Sync + 'static,
|
||||
N: NetworkAdapter + Send + Sync + 'static,
|
||||
N::Settings: Clone + Send + Sync + 'static,
|
||||
DaNetwork: NetworkAdapter + Send + Sync + 'static,
|
||||
DaNetwork::Settings: Clone + Send + Sync + 'static,
|
||||
DaStorage: DaStorageAdapter<Blob = DaBlob> + Sync + Send,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||
let DaSamplingServiceSettings {
|
||||
@ -165,10 +200,12 @@ where
|
||||
} = service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let network_relay = service_state.overwatch_handle.relay();
|
||||
let rng = R::from_entropy();
|
||||
let storage_relay = service_state.overwatch_handle.relay();
|
||||
let rng = SamplingRng::from_entropy();
|
||||
|
||||
Ok(Self {
|
||||
network_relay,
|
||||
storage_relay,
|
||||
service_state,
|
||||
sampler: Backend::new(sampling_settings, rng),
|
||||
})
|
||||
@ -181,17 +218,24 @@ where
|
||||
// position of his bls public key landing in the above-mentioned list.
|
||||
let Self {
|
||||
network_relay,
|
||||
storage_relay,
|
||||
mut service_state,
|
||||
mut sampler,
|
||||
} = self;
|
||||
let DaSamplingServiceSettings { .. } = service_state.settings_reader.get_updated_settings();
|
||||
let DaSamplingServiceSettings {
|
||||
storage_adapter_settings,
|
||||
..
|
||||
} = service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let network_relay = network_relay.connect().await?;
|
||||
let mut network_adapter = N::new(network_relay).await;
|
||||
let mut network_adapter = DaNetwork::new(network_relay).await;
|
||||
|
||||
let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?;
|
||||
let mut next_prune_tick = sampler.prune_interval();
|
||||
|
||||
let storage_relay = storage_relay.connect().await?;
|
||||
let storage_adapter = DaStorage::new(storage_adapter_settings, storage_relay).await;
|
||||
|
||||
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||
async {
|
||||
loop {
|
||||
@ -200,7 +244,7 @@ where
|
||||
Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await;
|
||||
}
|
||||
Some(sampling_message) = sampling_message_stream.next() => {
|
||||
Self::handle_sampling_message(sampling_message, &mut sampler).await;
|
||||
Self::handle_sampling_message(sampling_message, &mut sampler, &storage_adapter).await;
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
|
@ -0,0 +1,2 @@
|
||||
#[cfg(feature = "rocksdb-backend")]
|
||||
pub mod rocksdb;
|
@ -0,0 +1,59 @@
|
||||
// std
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{marker::PhantomData, path::PathBuf};
|
||||
// crates
|
||||
use nomos_core::da::blob::Blob;
|
||||
use nomos_storage::{
|
||||
backends::{rocksdb::RocksBackend, StorageSerde},
|
||||
StorageMsg, StorageService,
|
||||
};
|
||||
use overwatch_rs::{
|
||||
services::{relay::OutboundRelay, ServiceData},
|
||||
DynError,
|
||||
};
|
||||
// internal
|
||||
use crate::storage::DaStorageAdapter;
|
||||
|
||||
pub struct RocksAdapter<B, S>
|
||||
where
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
_settings: RocksAdapterSettings,
|
||||
_storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
|
||||
blob: PhantomData<B>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<B, S> DaStorageAdapter for RocksAdapter<B, S>
|
||||
where
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
B: Blob + Clone + Send + Sync + 'static,
|
||||
B::BlobId: Send,
|
||||
{
|
||||
type Backend = RocksBackend<S>;
|
||||
type Blob = B;
|
||||
type Settings = RocksAdapterSettings;
|
||||
|
||||
async fn new(
|
||||
_settings: Self::Settings,
|
||||
_storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self {
|
||||
Self {
|
||||
_settings,
|
||||
_storage_relay,
|
||||
blob: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_blob(
|
||||
&self,
|
||||
_blob_id: <Self::Blob as Blob>::BlobId,
|
||||
) -> Result<Option<Self::Blob>, DynError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RocksAdapterSettings {
|
||||
pub blob_storage_directory: PathBuf,
|
||||
}
|
25
nomos-services/data-availability/sampling/src/storage/mod.rs
Normal file
25
nomos-services/data-availability/sampling/src/storage/mod.rs
Normal file
@ -0,0 +1,25 @@
|
||||
pub mod adapters;
|
||||
|
||||
use nomos_core::da::blob::Blob;
|
||||
use nomos_storage::{backends::StorageBackend, StorageService};
|
||||
use overwatch_rs::{
|
||||
services::{relay::OutboundRelay, ServiceData},
|
||||
DynError,
|
||||
};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait DaStorageAdapter {
|
||||
type Backend: StorageBackend + Send + Sync + 'static;
|
||||
type Settings: Clone + Send + Sync + 'static;
|
||||
type Blob: Blob + Clone;
|
||||
|
||||
async fn new(
|
||||
settings: Self::Settings,
|
||||
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self;
|
||||
|
||||
async fn get_blob(
|
||||
&self,
|
||||
blob_id: <Self::Blob as Blob>::BlobId,
|
||||
) -> Result<Option<Self::Blob>, DynError>;
|
||||
}
|
@ -5,6 +5,7 @@ use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transa
|
||||
use nomos_da_indexer::consensus::adapters::cryptarchia::CryptarchiaConsensusAdapter;
|
||||
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapter as IndexerStorageAdapter;
|
||||
use nomos_da_indexer::DataIndexerService;
|
||||
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStorageAdapter;
|
||||
use nomos_da_sampling::DaSamplingService;
|
||||
use nomos_da_sampling::{
|
||||
backend::kzgrs::KzgrsSamplingBackend,
|
||||
@ -42,12 +43,14 @@ pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub type DaSampling = DaSamplingService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub(crate) type DaIndexer = DataIndexerService<
|
||||
@ -67,6 +70,7 @@ pub(crate) type DaIndexer = DataIndexerService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
SamplingLibp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub(crate) type TxMempool = TxMempoolService<
|
||||
@ -80,6 +84,7 @@ pub type DaMempool = DaMempoolService<
|
||||
KzgrsSamplingBackend<ChaCha20Rng>,
|
||||
nomos_da_sampling::network::adapters::libp2p::Libp2pAdapter<NomosDaMembership>,
|
||||
ChaCha20Rng,
|
||||
SamplingStorageAdapter<DaBlob, Wire>,
|
||||
>;
|
||||
|
||||
pub(crate) type DaVerifier = DaVerifierService<
|
||||
|
@ -21,6 +21,7 @@ use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
|
||||
use nomos_da_network_service::NetworkService as DaNetworkService;
|
||||
use nomos_da_sampling::backend::kzgrs::KzgrsSamplingBackendSettings;
|
||||
use nomos_da_sampling::network::adapters::libp2p::DaNetworkSamplingSettings;
|
||||
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageSettings;
|
||||
use nomos_da_sampling::DaSamplingServiceSettings;
|
||||
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
|
||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageSettings;
|
||||
@ -164,6 +165,9 @@ fn new_node(
|
||||
num_samples: 0,
|
||||
subnet_size: 0,
|
||||
},
|
||||
storage_adapter_settings: SamplingStorageSettings {
|
||||
blob_storage_directory: blobs_dir.clone(),
|
||||
},
|
||||
},
|
||||
},
|
||||
None,
|
||||
|
@ -17,8 +17,6 @@ use tokio_stream::StreamExt;
|
||||
// internal
|
||||
use crate::network::NetworkAdapter;
|
||||
|
||||
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
|
||||
|
||||
pub struct Libp2pAdapter<M>
|
||||
where
|
||||
M: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
|
||||
|
@ -12,6 +12,7 @@ use std::fmt::Debug;
|
||||
// #[cfg(feature = "metrics")]
|
||||
// use super::metrics::Metrics;
|
||||
use futures::StreamExt;
|
||||
use nomos_da_sampling::storage::DaStorageAdapter;
|
||||
use rand::{RngCore, SeedableRng};
|
||||
// internal
|
||||
use crate::backend::MemPool;
|
||||
@ -33,7 +34,7 @@ use overwatch_rs::services::{
|
||||
};
|
||||
use tracing::error;
|
||||
|
||||
pub struct DaMempoolService<N, P, DB, DN, R>
|
||||
pub struct DaMempoolService<N, P, DB, DN, R, SamplingStorage>
|
||||
where
|
||||
N: NetworkAdapter<Key = P::Key>,
|
||||
N::Payload: DispersedBlobInfo + Into<P::Item> + Debug + 'static,
|
||||
@ -47,18 +48,19 @@ where
|
||||
DB::BlobId: Debug + 'static,
|
||||
DB::Settings: Clone,
|
||||
DN: DaSamplingNetworkAdapter,
|
||||
SamplingStorage: DaStorageAdapter,
|
||||
R: SeedableRng + RngCore,
|
||||
{
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
network_relay: Relay<NetworkService<N::Backend>>,
|
||||
sampling_relay: Relay<DaSamplingService<DB, DN, R>>,
|
||||
sampling_relay: Relay<DaSamplingService<DB, DN, R, SamplingStorage>>,
|
||||
pool: P,
|
||||
// TODO: Add again after metrics refactor
|
||||
// #[cfg(feature = "metrics")]
|
||||
// metrics: Option<Metrics>,
|
||||
}
|
||||
|
||||
impl<N, P, DB, DN, R> ServiceData for DaMempoolService<N, P, DB, DN, R>
|
||||
impl<N, P, DB, DN, R, DaStorage> ServiceData for DaMempoolService<N, P, DB, DN, R, DaStorage>
|
||||
where
|
||||
N: NetworkAdapter<Key = P::Key>,
|
||||
N::Payload: DispersedBlobInfo + Debug + Into<P::Item> + 'static,
|
||||
@ -72,6 +74,7 @@ where
|
||||
DB::BlobId: Debug + 'static,
|
||||
DB::Settings: Clone,
|
||||
DN: DaSamplingNetworkAdapter,
|
||||
DaStorage: DaStorageAdapter,
|
||||
R: SeedableRng + RngCore,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = "mempool-da";
|
||||
@ -87,7 +90,7 @@ where
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<N, P, DB, DN, R> ServiceCore for DaMempoolService<N, P, DB, DN, R>
|
||||
impl<N, P, DB, DN, R, DaStorage> ServiceCore for DaMempoolService<N, P, DB, DN, R, DaStorage>
|
||||
where
|
||||
P: MemPool + Send + 'static,
|
||||
P::Settings: Clone + Send + Sync + 'static,
|
||||
@ -102,6 +105,7 @@ where
|
||||
DB::BlobId: Debug + 'static,
|
||||
DB::Settings: Clone,
|
||||
DN: DaSamplingNetworkAdapter,
|
||||
DaStorage: DaStorageAdapter,
|
||||
R: SeedableRng + RngCore,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
@ -178,7 +182,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<N, P, DB, DN, R> DaMempoolService<N, P, DB, DN, R>
|
||||
impl<N, P, DB, DN, R, DaStorage> DaMempoolService<N, P, DB, DN, R, DaStorage>
|
||||
where
|
||||
P: MemPool + Send + 'static,
|
||||
P::Settings: Clone + Send + Sync + 'static,
|
||||
@ -193,6 +197,7 @@ where
|
||||
DB::BlobId: Debug + 'static,
|
||||
DB::Settings: Clone,
|
||||
DN: DaSamplingNetworkAdapter,
|
||||
DaStorage: DaStorageAdapter,
|
||||
R: SeedableRng + RngCore,
|
||||
{
|
||||
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||
|
@ -22,6 +22,7 @@ use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as Indexe
|
||||
use nomos_da_indexer::IndexerSettings;
|
||||
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackendSettings;
|
||||
use nomos_da_network_service::NetworkConfig as DaNetworkConfig;
|
||||
use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapterSettings as SamplingStorageAdapterSettings;
|
||||
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifierSettings;
|
||||
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
|
||||
use nomos_da_verifier::DaVerifierServiceSettings;
|
||||
@ -130,7 +131,7 @@ impl NomosNode {
|
||||
|
||||
pub async fn get_block(&self, id: HeaderId) -> Option<Block<Tx, BlobInfo>> {
|
||||
CLIENT
|
||||
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
|
||||
.post(format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
|
||||
.header("Content-Type", "application/json")
|
||||
.body(serde_json::to_string(&id).unwrap())
|
||||
.send()
|
||||
@ -434,6 +435,9 @@ fn create_node_config(
|
||||
num_samples: 0,
|
||||
subnet_size: 0,
|
||||
},
|
||||
storage_adapter_settings: SamplingStorageAdapterSettings {
|
||||
blob_storage_directory: "./".into(),
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user