Column index in the sample requests (#731)
This commit is contained in:
parent
f8a73529cf
commit
9835d09b8c
|
@ -9,6 +9,7 @@ use futures::future::BoxFuture;
|
|||
use futures::stream::{BoxStream, FuturesUnordered};
|
||||
use futures::{AsyncWriteExt, FutureExt, StreamExt};
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use kzgrs_backend::common::ColumnIndex;
|
||||
use libp2p::core::Endpoint;
|
||||
use libp2p::swarm::dial_opts::DialOpts;
|
||||
use libp2p::swarm::{
|
||||
|
@ -158,6 +159,7 @@ impl Clone for SamplingError {
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct BehaviourSampleReq {
|
||||
pub blob_id: BlobId,
|
||||
pub column_idx: ColumnIndex,
|
||||
}
|
||||
|
||||
impl TryFrom<SampleReq> for BehaviourSampleReq {
|
||||
|
@ -165,7 +167,12 @@ impl TryFrom<SampleReq> for BehaviourSampleReq {
|
|||
|
||||
fn try_from(req: SampleReq) -> Result<Self, Self::Error> {
|
||||
let blob_id: BlobId = req.blob_id.try_into()?;
|
||||
Ok(Self { blob_id })
|
||||
let column_idx = req.column_idx as u16; // TODO: This doesn't handle the overflow.
|
||||
|
||||
Ok(Self {
|
||||
blob_id,
|
||||
column_idx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -386,6 +393,7 @@ where
|
|||
if let Some((subnetwork_id, blob_id)) = Self::next_request(&peer, to_sample) {
|
||||
let sample_request = SampleReq {
|
||||
blob_id: blob_id.to_vec(),
|
||||
column_idx: subnetwork_id,
|
||||
};
|
||||
outgoing_tasks
|
||||
.push(Self::stream_sample(stream, sample_request, subnetwork_id, blob_id).boxed());
|
||||
|
@ -510,6 +518,7 @@ impl<Membership: MembershipHandler<Id = PeerId, NetworkId = SubnetworkId> + 'sta
|
|||
let control = control.clone();
|
||||
let sample_request = SampleReq {
|
||||
blob_id: blob_id.to_vec(),
|
||||
column_idx: subnetwork_id,
|
||||
};
|
||||
let with_dial_task: OutgoingStreamHandlerFuture = async move {
|
||||
let stream = Self::open_stream(peer, control).await?;
|
||||
|
|
|
@ -19,6 +19,7 @@ message SampleErr {
|
|||
|
||||
message SampleReq {
|
||||
bytes blob_id = 1;
|
||||
uint32 column_idx = 2;
|
||||
}
|
||||
|
||||
message SampleRes {
|
||||
|
|
|
@ -10,7 +10,7 @@ use tokio::{
|
|||
|
||||
// TODO: Rocksdb has a feature called BlobDB that handles largo blob storing, but further
|
||||
// investigation needs to be done to see if rust wrapper supports it.
|
||||
pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Vec<Bytes> {
|
||||
pub async fn load_blobs(base_dir: PathBuf, blob_id: &[u8]) -> Vec<Bytes> {
|
||||
let blob_id = hex::encode(blob_id);
|
||||
|
||||
let mut path = base_dir;
|
||||
|
@ -47,6 +47,26 @@ pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Vec<Bytes> {
|
|||
blobs
|
||||
}
|
||||
|
||||
pub async fn load_blob(
|
||||
base_dir: PathBuf,
|
||||
blob_id: &[u8],
|
||||
column_idx: &[u8],
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
let blob_id = hex::encode(blob_id);
|
||||
let column_file = hex::encode(column_idx);
|
||||
|
||||
let mut path = base_dir;
|
||||
path.push(blob_id);
|
||||
path.push(column_file);
|
||||
|
||||
let mut file = tokio::fs::File::open(path).await?;
|
||||
|
||||
let mut data = Vec::new();
|
||||
file.read_to_end(&mut data).await?;
|
||||
|
||||
Ok(Bytes::from(data))
|
||||
}
|
||||
|
||||
pub async fn write_blob(
|
||||
base_dir: PathBuf,
|
||||
blob_id: &[u8],
|
||||
|
|
|
@ -9,7 +9,7 @@ use nomos_core::da::blob::{
|
|||
metadata::{Metadata, Next},
|
||||
};
|
||||
use nomos_core::da::BlobId;
|
||||
use nomos_da_storage::fs::load_blob;
|
||||
use nomos_da_storage::fs::load_blobs;
|
||||
use nomos_da_storage::rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX, DA_VID_KEY_PREFIX};
|
||||
use nomos_storage::{
|
||||
backends::{rocksdb::RocksBackend, StorageSerde},
|
||||
|
@ -128,7 +128,7 @@ where
|
|||
|
||||
futures.push(async move {
|
||||
match reply_rx.await {
|
||||
Ok(Some(id)) => (idx, load_blob(settings.blob_storage_directory, &id).await),
|
||||
Ok(Some(id)) => (idx, load_blobs(settings.blob_storage_directory, &id).await),
|
||||
Ok(None) => (idx, Vec::new()),
|
||||
Err(_) => {
|
||||
tracing::error!("Failed to receive storage response");
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use crate::backends::NetworkBackend;
|
||||
use futures::{Stream, StreamExt};
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use kzgrs_backend::common::ColumnIndex;
|
||||
use libp2p::identity::ed25519;
|
||||
use libp2p::{Multiaddr, PeerId};
|
||||
use log::error;
|
||||
|
@ -54,6 +55,7 @@ pub enum SamplingEvent {
|
|||
/// Incoming sampling request
|
||||
SamplingRequest {
|
||||
blob_id: BlobId,
|
||||
column_idx: ColumnIndex,
|
||||
response_sender: mpsc::Sender<Option<DaBlob>>,
|
||||
},
|
||||
/// A failed sampling error
|
||||
|
@ -222,11 +224,11 @@ async fn handle_validator_events_stream(
|
|||
}
|
||||
}
|
||||
sampling::behaviour::SamplingEvent::IncomingSample{request_receiver, response_sender} => {
|
||||
if let Ok(BehaviourSampleReq { blob_id }) = request_receiver.await {
|
||||
if let Ok(BehaviourSampleReq { blob_id, column_idx }) = request_receiver.await {
|
||||
let (sampling_response_sender, mut sampling_response_receiver) = mpsc::channel(1);
|
||||
|
||||
if let Err(e) = sampling_broadcast_sender
|
||||
.send(SamplingEvent::SamplingRequest { blob_id, response_sender: sampling_response_sender })
|
||||
.send(SamplingEvent::SamplingRequest { blob_id, column_idx, response_sender: sampling_response_sender })
|
||||
{
|
||||
error!("Error in internal broadcast of sampling request: {e:?}");
|
||||
sampling_response_receiver.close()
|
||||
|
|
|
@ -142,20 +142,23 @@ where
|
|||
}
|
||||
SamplingEvent::SamplingRequest {
|
||||
blob_id,
|
||||
column_idx,
|
||||
response_sender,
|
||||
} => match storage_adapter.get_blob(blob_id).await {
|
||||
Ok(maybe_blob) => {
|
||||
if response_sender.send(maybe_blob).await.is_err() {
|
||||
error!("Error sending sampling response");
|
||||
}
|
||||
} => {
|
||||
let maybe_blob = storage_adapter
|
||||
.get_blob(blob_id, column_idx)
|
||||
.await
|
||||
.map_err(|error| {
|
||||
error!("Failed to get blob from storage adapter: {error}");
|
||||
None::<Backend::Blob>
|
||||
})
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
if response_sender.send(maybe_blob).await.is_err() {
|
||||
error!("Error sending sampling response");
|
||||
}
|
||||
Err(error) => {
|
||||
error!("Failed to get blob from storage adapter: {error}");
|
||||
if response_sender.send(None).await.is_err() {
|
||||
error!("Error sending sampling response");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use kzgrs_backend::common::ColumnIndex;
|
||||
// std
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{marker::PhantomData, path::PathBuf};
|
||||
// crates
|
||||
use nomos_core::da::blob::Blob;
|
||||
|
@ -27,8 +28,8 @@ where
|
|||
impl<B, S> DaStorageAdapter for RocksAdapter<B, S>
|
||||
where
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
B: Blob + Clone + Send + Sync + 'static,
|
||||
B::BlobId: Send,
|
||||
B: Blob + DeserializeOwned + Clone + Send + Sync + 'static,
|
||||
B::BlobId: AsRef<[u8]> + Send,
|
||||
{
|
||||
type Backend = RocksBackend<S>;
|
||||
type Blob = B;
|
||||
|
@ -48,6 +49,7 @@ where
|
|||
async fn get_blob(
|
||||
&self,
|
||||
_blob_id: <Self::Blob as Blob>::BlobId,
|
||||
_column_idx: ColumnIndex,
|
||||
) -> Result<Option<Self::Blob>, DynError> {
|
||||
todo!()
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
pub mod adapters;
|
||||
|
||||
use kzgrs_backend::common::ColumnIndex;
|
||||
use nomos_core::da::blob::Blob;
|
||||
use nomos_storage::{backends::StorageBackend, StorageService};
|
||||
use overwatch_rs::{
|
||||
|
@ -21,5 +22,6 @@ pub trait DaStorageAdapter {
|
|||
async fn get_blob(
|
||||
&self,
|
||||
blob_id: <Self::Blob as Blob>::BlobId,
|
||||
column_idx: ColumnIndex,
|
||||
) -> Result<Option<Self::Blob>, DynError>;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue