1
0
mirror of synced 2025-02-02 02:46:39 +00:00

DA: Store and load multiple columns for the same blob in a single node (#694)

* ColumnIndex type in Blob trait

* Store blob id and column after verification

* Use blob_id+column_idx storage scheme in indexer

* Store and load multiple columns for the same blob

* Add comments
This commit is contained in:
gusto 2024-08-21 16:23:36 +03:00 committed by GitHub
parent 2ca822e6ec
commit 3671376691
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 170 additions and 62 deletions

View File

@ -75,6 +75,7 @@ where
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
C: DispersedBlobInfo<BlobId = [u8; 32]>
+ Clone
+ Debug
@ -327,6 +328,7 @@ where
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B>,
<VB as VerifierBackend>::Settings: Clone,
<VB as CoreDaVerifier>::Error: Error,

View File

@ -4,8 +4,10 @@ pub mod select;
pub trait Blob {
type BlobId;
type ColumnIndex;
fn id(&self) -> Self::BlobId;
fn column_idx(&self) -> Self::ColumnIndex;
}
pub trait BlobSelect {

View File

@ -1,7 +1,10 @@
pub mod blob;
// crates
// internal
use blob::Blob;
pub mod blob;
pub type BlobId = [u8; 32];
pub trait DaEncoder {
type EncodedData;
@ -11,7 +14,7 @@ pub trait DaEncoder {
}
pub trait DaVerifier {
type DaBlob;
type DaBlob: Blob;
type Error;
fn verify(&self, blob: &Self::DaBlob) -> Result<(), Self::Error>;

View File

@ -57,9 +57,14 @@ impl DaBlob {
}
impl blob::Blob for DaBlob {
type BlobId = Vec<u8>;
type BlobId = [u8; 32];
type ColumnIndex = [u8; 2];
fn id(&self) -> Self::BlobId {
build_blob_id(&self.aggregated_column_commitment, &self.rows_commitments).into()
build_blob_id(&self.aggregated_column_commitment, &self.rows_commitments)
}
fn column_idx(&self) -> Self::ColumnIndex {
self.column_idx.to_be_bytes()
}
}

View File

@ -3,45 +3,62 @@ use std::path::PathBuf;
// crates
use bytes::Bytes;
use tokio::{
fs::{File, OpenOptions},
fs::{self, File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
};
// internal
// TODO: Rocksdb has a feature called BlobDB that handles largo blob storing, but further
// investigation needs to be done to see if rust wrapper supports it.
pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Option<Bytes> {
pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Vec<Bytes> {
let blob_id = hex::encode(blob_id);
let mut path = base_dir;
path.push(blob_id);
let mut file = match File::open(path).await {
Ok(file) => file,
let mut blobs = Vec::new();
let mut column_files = match fs::read_dir(&path).await {
Ok(entries) => entries,
Err(e) => {
tracing::error!("Failed to open file: {}", e);
return None;
tracing::error!("Failed to read directory: {}", e);
return blobs;
}
};
let mut contents = vec![];
if let Err(e) = file.read_to_end(&mut contents).await {
tracing::error!("Failed to read file: {}", e);
return None;
while let Some(entry) = column_files.next_entry().await.ok().flatten() {
let mut file = match File::open(entry.path()).await {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to open file: {}", e);
continue;
}
};
let mut contents = vec![];
if let Err(e) = file.read_to_end(&mut contents).await {
tracing::error!("Failed to read file: {}", e);
continue;
}
blobs.push(Bytes::from(contents));
}
Some(Bytes::from(contents))
blobs
}
pub async fn write_blob(
base_dir: PathBuf,
blob_id: &[u8],
column_idx: &[u8],
data: &[u8],
) -> Result<(), std::io::Error> {
let blob_id = hex::encode(blob_id);
let column_file = hex::encode(column_idx);
let mut path = base_dir;
path.push(blob_id);
path.push(column_file);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;

View File

@ -1,7 +1,7 @@
use bytes::{Bytes, BytesMut};
pub const DA_VID_KEY_PREFIX: &str = "da/vid/";
pub const DA_ATTESTED_KEY_PREFIX: &str = "da/attested/";
pub const DA_VERIFIED_KEY_PREFIX: &str = "da/verified/";
pub fn key_bytes(prefix: &str, id: impl AsRef<[u8]>) -> Bytes {
let mut buffer = BytesMut::new();

View File

@ -54,6 +54,7 @@ where
A: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
B: Blob + Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
<B as Blob>::BlobId: AsRef<[u8]> + Send + Sync + 'static,
<B as Blob>::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
VB: VerifierBackend + CoreDaVerifier<DaBlob = B>,
<VB as VerifierBackend>::Settings: Clone,
<VB as CoreDaVerifier>::Error: Error,
@ -76,7 +77,7 @@ pub async fn get_range<Tx, C, V, SS, const SIZE: usize>(
handle: &OverwatchHandle,
app_id: <V as Metadata>::AppId,
range: Range<<V as Metadata>::Index>,
) -> Result<Vec<(<V as Metadata>::Index, Option<Bytes>)>, DynError>
) -> Result<Vec<(<V as Metadata>::Index, Vec<Bytes>)>, DynError>
where
Tx: Transaction
+ Eq

View File

@ -74,7 +74,7 @@ pub enum DaMsg<B, V: Metadata> {
GetRange {
app_id: <V as Metadata>::AppId,
range: Range<<V as Metadata>::Index>,
reply_channel: Sender<Vec<(<V as Metadata>::Index, Option<B>)>>,
reply_channel: Sender<Vec<(<V as Metadata>::Index, Vec<B>)>>,
},
}

View File

@ -7,8 +7,9 @@ use nomos_core::da::blob::{
info::DispersedBlobInfo,
metadata::{Metadata, Next},
};
use nomos_core::da::BlobId;
use nomos_da_storage::fs::load_blob;
use nomos_da_storage::rocksdb::{key_bytes, DA_ATTESTED_KEY_PREFIX, DA_VID_KEY_PREFIX};
use nomos_da_storage::rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX, DA_VID_KEY_PREFIX};
use nomos_storage::{
backends::{rocksdb::RocksBackend, StorageSerde},
StorageMsg, StorageService,
@ -34,7 +35,7 @@ where
impl<S, B> DaStorageAdapter for RocksAdapter<S, B>
where
S: StorageSerde + Send + Sync + 'static,
B: DispersedBlobInfo<BlobId = [u8; 32]> + Metadata + Send + Sync,
B: DispersedBlobInfo<BlobId = BlobId> + Metadata + Send + Sync,
B::Index: AsRef<[u8]> + Next + Clone + PartialOrd + Send + Sync + 'static,
B::AppId: AsRef<[u8]> + Clone + Send + Sync + 'static,
{
@ -58,19 +59,19 @@ where
let (app_id, idx) = info.metadata();
// Check if Info in a block is something that the node've seen before.
let attested_key = key_bytes(DA_ATTESTED_KEY_PREFIX, info.blob_id());
let attested_key = key_bytes(DA_VERIFIED_KEY_PREFIX, info.blob_id());
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay
.send(StorageMsg::Load {
key: attested_key,
.send(StorageMsg::LoadPrefix {
prefix: attested_key,
reply_channel: reply_tx,
})
.await
.expect("Failed to send load request to storage relay");
// If node haven't attested this info, return early.
if reply_rx.await?.is_none() {
if reply_rx.await?.is_empty() {
return Ok(());
}
@ -95,8 +96,7 @@ where
&self,
app_id: <Self::Info as Metadata>::AppId,
index_range: Range<<Self::Info as Metadata>::Index>,
) -> Box<dyn Stream<Item = (<Self::Info as Metadata>::Index, Option<Bytes>)> + Unpin + Send>
{
) -> Box<dyn Stream<Item = (<Self::Info as Metadata>::Index, Vec<Bytes>)> + Unpin + Send> {
let futures = FuturesUnordered::new();
// TODO: Using while loop here until `Step` trait is stable.
@ -127,10 +127,10 @@ where
futures.push(async move {
match reply_rx.await {
Ok(Some(id)) => (idx, load_blob(settings.blob_storage_directory, &id).await),
Ok(None) => (idx, None),
Ok(None) => (idx, Vec::new()),
Err(_) => {
tracing::error!("Failed to receive storage response");
(idx, None)
(idx, Vec::new())
}
}
});

View File

@ -28,5 +28,5 @@ pub trait DaStorageAdapter {
&self,
app_id: <Self::Info as Metadata>::AppId,
range: Range<<Self::Info as Metadata>::Index>,
) -> Box<dyn Stream<Item = (<Self::Info as Metadata>::Index, Option<Self::Blob>)> + Unpin + Send>;
) -> Box<dyn Stream<Item = (<Self::Info as Metadata>::Index, Vec<Self::Blob>)> + Unpin + Send>;
}

View File

@ -17,6 +17,7 @@ use nomos_core::tx::Transaction;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings;
use nomos_da_indexer::IndexerSettings;
use nomos_da_storage::fs::write_blob;
use nomos_da_storage::rocksdb::DA_VERIFIED_KEY_PREFIX;
use nomos_libp2p::{Multiaddr, SwarmConfig};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
@ -197,11 +198,17 @@ fn test_indexer() {
let _hash3 = <BlobInfo as Hash>::hash(&blob_info, &mut default_hasher);
let expected_blob_info = blob_info.clone();
let col_idx = (0 as u16).to_be_bytes();
// Mock attestation step where blob is persisted in nodes blob storage.
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(write_blob(blobs_dir, blob_info.blob_id().as_ref(), b"blob"))
.unwrap();
rt.block_on(write_blob(
blobs_dir,
blob_info.blob_id().as_ref(),
&col_idx,
b"blob",
))
.unwrap();
node1.spawn(async move {
let mempool_outbound = mempool.connect().await.unwrap();
@ -222,8 +229,9 @@ fn test_indexer() {
});
// Mock attested blob by writting directly into the da storage.
let mut attested_key = Vec::from(b"da/attested/" as &[u8]);
let mut attested_key = Vec::from(DA_VERIFIED_KEY_PREFIX.as_bytes());
attested_key.extend_from_slice(&blob_hash);
attested_key.extend_from_slice(&col_idx);
storage_outbound
.send(nomos_storage::StorageMsg::Store {
@ -281,10 +289,12 @@ fn test_indexer() {
// item should have "some" data, other indexes should be None.
app_id_blobs.sort_by(|(a, _), (b, _)| a.partial_cmp(b).unwrap());
let app_id_blobs = app_id_blobs.iter().map(|(_, b)| b).collect::<Vec<_>>();
if let Some(blob) = app_id_blobs[0] {
if **blob == *b"blob" && app_id_blobs[1].is_none() {
is_success_tx.store(true, SeqCst);
}
// When Indexer is asked for app_id at index, it will return all blobs that it has for that
// blob_id.
let columns = app_id_blobs[0];
if !columns.is_empty() && *columns[0] == *b"blob" && app_id_blobs[1].is_empty() {
is_success_tx.store(true, SeqCst);
}
performed_tx.store(true, SeqCst);

View File

@ -2,6 +2,7 @@ pub mod backend;
pub mod network;
pub mod storage;
use nomos_core::da::blob::Blob;
// std
use nomos_storage::StorageService;
use overwatch_rs::services::life_cycle::LifecycleMessage;
@ -73,7 +74,7 @@ where
storage_adapter: &S,
blob: &Backend::DaBlob,
) -> Result<(), DynError> {
if storage_adapter.get_attestation(blob).await?.is_some() {
if storage_adapter.get_blob(blob.id()).await?.is_some() {
Ok(())
} else {
verifier.verify(blob)?;
@ -179,19 +180,19 @@ where
}
}
Some(msg) = service_state.inbound_relay.recv() => {
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) {
error!("Error replying attestation {err:?}");
},
Err(err) => {
error!("Error handling blob {blob:?} due to {err:?}");
if let Err(err) = reply_channel.send(None) {
let DaVerifierMsg::AddBlob { blob, reply_channel } = msg;
match Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
Ok(attestation) => if let Err(err) = reply_channel.send(Some(attestation)) {
error!("Error replying attestation {err:?}");
}
},
};
}
},
Err(err) => {
error!("Error handling blob {blob:?} due to {err:?}");
if let Err(err) = reply_channel.send(None) {
error!("Error replying attestation {err:?}");
}
},
};
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;

View File

@ -5,7 +5,7 @@ use std::{marker::PhantomData, path::PathBuf};
use nomos_core::da::blob::Blob;
use nomos_da_storage::{
fs::write_blob,
rocksdb::{key_bytes, DA_ATTESTED_KEY_PREFIX},
rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX},
};
use nomos_storage::{
backends::{rocksdb::RocksBackend, StorageSerde},
@ -34,6 +34,7 @@ where
A: Serialize + DeserializeOwned + Clone + Send + Sync,
B: Blob + Serialize + Clone + Send + Sync + 'static,
B::BlobId: AsRef<[u8]> + Send + Sync + 'static,
B::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,
{
type Backend = RocksBackend<S>;
@ -59,16 +60,18 @@ where
attestation: &Self::Attestation,
) -> Result<(), DynError> {
let blob_bytes = S::serialize(blob);
let blob_idx = create_blob_idx(blob.id().as_ref(), blob.column_idx().as_ref());
write_blob(
self.settings.blob_storage_directory.clone(),
blob.id().as_ref(),
blob.column_idx().as_ref(),
&blob_bytes,
)
.await?;
// Mark blob as attested for lateer use in Indexer and attestation cache.
let blob_key = key_bytes(DA_ATTESTED_KEY_PREFIX, blob.id().as_ref());
let blob_key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx);
self.storage_relay
.send(StorageMsg::Store {
key: blob_key,
@ -78,17 +81,14 @@ where
.map_err(|(e, _)| e.into())
}
async fn get_attestation(
async fn get_blob(
&self,
blob: &Self::Blob,
blob_idx: <Self::Blob as Blob>::BlobId,
) -> Result<Option<Self::Attestation>, DynError> {
let attestation_key = key_bytes(DA_ATTESTED_KEY_PREFIX, blob.id().as_ref());
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx);
let (reply_channel, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay
.send(StorageMsg::Load {
key: attestation_key,
reply_channel: reply_tx,
})
.send(StorageMsg::Load { key, reply_channel })
.await
.expect("Failed to send load request to storage relay");
@ -107,6 +107,16 @@ where
}
}
// Combines a 32-byte blob ID (`[u8; 32]`) with a 2-byte column index
// (`u16` represented as `[u8; 2]`).
fn create_blob_idx(blob_id: &[u8], column_idx: &[u8]) -> [u8; 34] {
let mut blob_idx = [0u8; 34];
blob_idx[..blob_id.len()].copy_from_slice(blob_id);
blob_idx[blob_id.len()..].copy_from_slice(column_idx);
blob_idx
}
#[derive(Debug, Clone)]
pub struct RocksAdapterSettings {
pub blob_storage_directory: PathBuf,

View File

@ -1,5 +1,6 @@
pub mod adapters;
use nomos_core::da::blob::Blob;
use nomos_storage::{backends::StorageBackend, StorageService};
use overwatch_rs::{
services::{relay::OutboundRelay, ServiceData},
@ -10,7 +11,7 @@ use overwatch_rs::{
pub trait DaStorageAdapter {
type Backend: StorageBackend + Send + Sync + 'static;
type Settings: Clone;
type Blob: Clone;
type Blob: Blob + Clone;
type Attestation: Clone;
async fn new(
@ -23,8 +24,9 @@ pub trait DaStorageAdapter {
blob: &Self::Blob,
attestation: &Self::Attestation,
) -> Result<(), DynError>;
async fn get_attestation(
async fn get_blob(
&self,
blob: &Self::Blob,
blob_id: <Self::Blob as Blob>::BlobId,
) -> Result<Option<Self::Attestation>, DynError>;
}

View File

@ -54,6 +54,10 @@ impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for MockStora
Ok(self.inner.get(key).cloned())
}
async fn load_prefix(&mut self, _key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
unimplemented!()
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
Ok(self.inner.remove(key))
}

View File

@ -45,6 +45,8 @@ pub trait StorageBackend: Sized {
fn new(config: Self::Settings) -> Result<Self, Self::Error>;
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error>;
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error>;
/// Loads all values whose keys start with the given prefix.
async fn load_prefix(&mut self, prefix: &[u8]) -> Result<Vec<Bytes>, Self::Error>;
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error>;
/// Execute a transaction in the current backend
async fn execute(

View File

@ -117,6 +117,22 @@ impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for RocksBack
self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into()))
}
async fn load_prefix(&mut self, prefix: &[u8]) -> Result<Vec<Bytes>, Self::Error> {
let mut values = Vec::new();
let iter = self.rocks.prefix_iterator(prefix);
for item in iter {
match item {
Ok((_key, value)) => {
values.push(Bytes::from(value.to_vec()));
}
Err(e) => return Err(e), // Return the error if one occurs
}
}
Ok(values)
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.load(key).await.and_then(|val| {
if val.is_some() {

View File

@ -75,6 +75,10 @@ impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for SledBacke
Ok(self.sled.get(key)?.map(|ivec| ivec.to_vec().into()))
}
async fn load_prefix(&mut self, _key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
unimplemented!()
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
Ok(self.sled.remove(key)?.map(|ivec| ivec.to_vec().into()))
}

View File

@ -25,6 +25,10 @@ pub enum StorageMsg<Backend: StorageBackend> {
key: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Option<Bytes>>,
},
LoadPrefix {
prefix: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Vec<Bytes>>,
},
Store {
key: Bytes,
value: Bytes,
@ -139,6 +143,9 @@ impl<Backend: StorageBackend> Debug for StorageMsg<Backend> {
StorageMsg::Load { key, .. } => {
write!(f, "Load {{ {key:?} }}")
}
StorageMsg::LoadPrefix { prefix, .. } => {
write!(f, "LoadPrefix {{ {prefix:?} }}")
}
StorageMsg::Store { key, value } => {
write!(f, "Store {{ {key:?}, {value:?}}}")
}
@ -189,6 +196,10 @@ impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
StorageMsg::Load { key, reply_channel } => {
Self::handle_load(backend, key, reply_channel).await
}
StorageMsg::LoadPrefix {
prefix,
reply_channel,
} => Self::handle_load_prefix(backend, prefix, reply_channel).await,
StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await,
StorageMsg::Remove { key, reply_channel } => {
Self::handle_remove(backend, key, reply_channel).await
@ -220,6 +231,24 @@ impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
})
}
/// Handle load prefix message
async fn handle_load_prefix(
backend: &mut Backend,
prefix: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Vec<Bytes>>,
) -> Result<(), StorageServiceError<Backend>> {
let result: Vec<Bytes> = backend
.load_prefix(&prefix)
.await
.map_err(StorageServiceError::BackendError)?;
reply_channel
.send(result)
.map_err(|_| StorageServiceError::ReplyError {
operation: "LoadPrefix".to_string(),
key: prefix,
})
}
/// Handle remove message
async fn handle_remove(
backend: &mut Backend,