Implemented get_blob on sampling rocksdb adapter (#732)
* Implemented get_blob on sampling rocksdb adapter * addressed PR comments
This commit is contained in:
parent
9835d09b8c
commit
49ac81b950
|
@ -1,4 +1,8 @@
|
|||
use kzgrs_backend::common::ColumnIndex;
|
||||
use nomos_da_storage::{
|
||||
fs::load_blob,
|
||||
rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX},
|
||||
};
|
||||
// std
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{marker::PhantomData, path::PathBuf};
|
||||
|
@ -19,8 +23,8 @@ pub struct RocksAdapter<B, S>
|
|||
where
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
_settings: RocksAdapterSettings,
|
||||
_storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
|
||||
settings: RocksAdapterSettings,
|
||||
storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
|
||||
blob: PhantomData<B>,
|
||||
}
|
||||
|
||||
|
@ -36,22 +40,43 @@ where
|
|||
type Settings = RocksAdapterSettings;
|
||||
|
||||
async fn new(
|
||||
_settings: Self::Settings,
|
||||
_storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
|
||||
settings: Self::Settings,
|
||||
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self {
|
||||
Self {
|
||||
_settings,
|
||||
_storage_relay,
|
||||
settings,
|
||||
storage_relay,
|
||||
blob: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_blob(
|
||||
&self,
|
||||
_blob_id: <Self::Blob as Blob>::BlobId,
|
||||
_column_idx: ColumnIndex,
|
||||
blob_id: <Self::Blob as Blob>::BlobId,
|
||||
column_idx: ColumnIndex,
|
||||
) -> Result<Option<Self::Blob>, DynError> {
|
||||
todo!()
|
||||
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
|
||||
self.storage_relay
|
||||
.send(StorageMsg::Load {
|
||||
key: key_bytes(DA_VERIFIED_KEY_PREFIX, &blob_id),
|
||||
reply_channel: reply_tx,
|
||||
})
|
||||
.await
|
||||
.expect("failed to send Load message to storage relay");
|
||||
|
||||
if reply_rx.await?.is_some() {
|
||||
let blob_bytes = load_blob(
|
||||
self.settings.blob_storage_directory.clone(),
|
||||
blob_id.as_ref(),
|
||||
&column_idx.to_be_bytes(),
|
||||
)
|
||||
.await?;
|
||||
Ok(S::deserialize(blob_bytes)
|
||||
.map(|blob| Some(blob))
|
||||
.unwrap_or_default())
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue