diff --git a/nomos-da/network/core/src/protocols/sampling/behaviour.rs b/nomos-da/network/core/src/protocols/sampling/behaviour.rs index 72d0568d..d8f813d3 100644 --- a/nomos-da/network/core/src/protocols/sampling/behaviour.rs +++ b/nomos-da/network/core/src/protocols/sampling/behaviour.rs @@ -9,6 +9,7 @@ use futures::future::BoxFuture; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{AsyncWriteExt, FutureExt, StreamExt}; use kzgrs_backend::common::blob::DaBlob; +use kzgrs_backend::common::ColumnIndex; use libp2p::core::Endpoint; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::{ @@ -158,6 +159,7 @@ impl Clone for SamplingError { #[derive(Debug, Clone)] pub struct BehaviourSampleReq { pub blob_id: BlobId, + pub column_idx: ColumnIndex, } impl TryFrom for BehaviourSampleReq { @@ -165,7 +167,12 @@ impl TryFrom for BehaviourSampleReq { fn try_from(req: SampleReq) -> Result { let blob_id: BlobId = req.blob_id.try_into()?; - Ok(Self { blob_id }) + let column_idx = req.column_idx as u16; // TODO: This doesn't handle the overflow. + + Ok(Self { + blob_id, + column_idx, + }) } } @@ -386,6 +393,7 @@ where if let Some((subnetwork_id, blob_id)) = Self::next_request(&peer, to_sample) { let sample_request = SampleReq { blob_id: blob_id.to_vec(), + column_idx: subnetwork_id, }; outgoing_tasks .push(Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).boxed()); @@ -510,6 +518,7 @@ impl + 'sta let control = control.clone(); let sample_request = SampleReq { blob_id: blob_id.to_vec(), + column_idx: subnetwork_id, }; let with_dial_task: OutgoingStreamHandlerFuture = async move { let stream = Self::open_stream(peer, control).await?; diff --git a/nomos-da/network/messages/proto/sampling.proto b/nomos-da/network/messages/proto/sampling.proto index d077926c..23b38f74 100644 --- a/nomos-da/network/messages/proto/sampling.proto +++ b/nomos-da/network/messages/proto/sampling.proto @@ -19,6 +19,7 @@ message SampleErr { message SampleReq { bytes blob_id = 1; + uint32 column_idx = 2; } message SampleRes { diff --git a/nomos-da/storage/src/fs/mod.rs b/nomos-da/storage/src/fs/mod.rs index 6fd71bcc..923eaf0a 100644 --- a/nomos-da/storage/src/fs/mod.rs +++ b/nomos-da/storage/src/fs/mod.rs @@ -10,7 +10,7 @@ use tokio::{ // TODO: Rocksdb has a feature called BlobDB that handles largo blob storing, but further // investigation needs to be done to see if rust wrapper supports it. -pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Vec { +pub async fn load_blobs(base_dir: PathBuf, blob_id: &[u8]) -> Vec { let blob_id = hex::encode(blob_id); let mut path = base_dir; @@ -47,6 +47,26 @@ pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Vec { blobs } +pub async fn load_blob( + base_dir: PathBuf, + blob_id: &[u8], + column_idx: &[u8], +) -> Result { + let blob_id = hex::encode(blob_id); + let column_file = hex::encode(column_idx); + + let mut path = base_dir; + path.push(blob_id); + path.push(column_file); + + let mut file = tokio::fs::File::open(path).await?; + + let mut data = Vec::new(); + file.read_to_end(&mut data).await?; + + Ok(Bytes::from(data)) +} + pub async fn write_blob( base_dir: PathBuf, blob_id: &[u8], diff --git a/nomos-services/data-availability/indexer/src/storage/adapters/rocksdb.rs b/nomos-services/data-availability/indexer/src/storage/adapters/rocksdb.rs index 3c40273e..479b0b65 100644 --- a/nomos-services/data-availability/indexer/src/storage/adapters/rocksdb.rs +++ b/nomos-services/data-availability/indexer/src/storage/adapters/rocksdb.rs @@ -9,7 +9,7 @@ use nomos_core::da::blob::{ metadata::{Metadata, Next}, }; use nomos_core::da::BlobId; -use nomos_da_storage::fs::load_blob; +use nomos_da_storage::fs::load_blobs; use nomos_da_storage::rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX, DA_VID_KEY_PREFIX}; use nomos_storage::{ backends::{rocksdb::RocksBackend, StorageSerde}, @@ -128,7 +128,7 @@ where futures.push(async move { match reply_rx.await { - Ok(Some(id)) => (idx, load_blob(settings.blob_storage_directory, &id).await), + Ok(Some(id)) => (idx, load_blobs(settings.blob_storage_directory, &id).await), Ok(None) => (idx, Vec::new()), Err(_) => { tracing::error!("Failed to receive storage response"); diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index f60ad009..03f3f39e 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -1,6 +1,7 @@ use crate::backends::NetworkBackend; use futures::{Stream, StreamExt}; use kzgrs_backend::common::blob::DaBlob; +use kzgrs_backend::common::ColumnIndex; use libp2p::identity::ed25519; use libp2p::{Multiaddr, PeerId}; use log::error; @@ -54,6 +55,7 @@ pub enum SamplingEvent { /// Incoming sampling request SamplingRequest { blob_id: BlobId, + column_idx: ColumnIndex, response_sender: mpsc::Sender>, }, /// A failed sampling error @@ -222,11 +224,11 @@ async fn handle_validator_events_stream( } } sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => { - if let Ok(BehaviourSampleReq { blob_id }) = request_receiver.await { + if let Ok(BehaviourSampleReq { blob_id, column_idx }) = request_receiver.await { let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1); if let Err(e) = sampling_broadcast_sender - .send(SamplingEvent::SamplingRequest { blob_id, response_sender: sampling_response_sender }) + .send(SamplingEvent::SamplingRequest { blob_id, column_idx, response_sender: sampling_response_sender }) { error!("Error in internal broadcast of sampling request: {e:?}"); sampling_response_receiver.close() diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 18e880fd..1041230a 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -142,20 +142,23 @@ where } SamplingEvent::SamplingRequest { blob_id, + column_idx, response_sender, - } => match storage_adapter.get_blob(blob_id).await { - Ok(maybe_blob) => { - if response_sender.send(maybe_blob).await.is_err() { - error!("Error sending sampling response"); - } + } => { + let maybe_blob = storage_adapter + .get_blob(blob_id, column_idx) + .await + .map_err(|error| { + error!("Failed to get blob from storage adapter: {error}"); + None:: + }) + .ok() + .flatten(); + + if response_sender.send(maybe_blob).await.is_err() { + error!("Error sending sampling response"); } - Err(error) => { - error!("Failed to get blob from storage adapter: {error}"); - if response_sender.send(None).await.is_err() { - error!("Error sending sampling response"); - } - } - }, + } } } } diff --git a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs index 303754f6..8e0100e3 100644 --- a/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs +++ b/nomos-services/data-availability/sampling/src/storage/adapters/rocksdb.rs @@ -1,5 +1,6 @@ +use kzgrs_backend::common::ColumnIndex; // std -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{marker::PhantomData, path::PathBuf}; // crates use nomos_core::da::blob::Blob; @@ -27,8 +28,8 @@ where impl DaStorageAdapter for RocksAdapter where S: StorageSerde + Send + Sync + 'static, - B: Blob + Clone + Send + Sync + 'static, - B::BlobId: Send, + B: Blob + DeserializeOwned + Clone + Send + Sync + 'static, + B::BlobId: AsRef<[u8]> + Send, { type Backend = RocksBackend; type Blob = B; @@ -48,6 +49,7 @@ where async fn get_blob( &self, _blob_id: ::BlobId, + _column_idx: ColumnIndex, ) -> Result, DynError> { todo!() } diff --git a/nomos-services/data-availability/sampling/src/storage/mod.rs b/nomos-services/data-availability/sampling/src/storage/mod.rs index c0433f3f..69c1e318 100644 --- a/nomos-services/data-availability/sampling/src/storage/mod.rs +++ b/nomos-services/data-availability/sampling/src/storage/mod.rs @@ -1,5 +1,6 @@ pub mod adapters; +use kzgrs_backend::common::ColumnIndex; use nomos_core::da::blob::Blob; use nomos_storage::{backends::StorageBackend, StorageService}; use overwatch_rs::{ @@ -21,5 +22,6 @@ pub trait DaStorageAdapter { async fn get_blob( &self, blob_id: ::BlobId, + column_idx: ColumnIndex, ) -> Result, DynError>; }