chore(encoder): Replace ad-hoc DaBlob iterations (#966)
This commit is contained in:
parent
1260230898
commit
0b3b298e4c
|
@ -3,14 +3,12 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
// crates
|
// crates
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use itertools::izip;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
// internal
|
// internal
|
||||||
use crate::adapters::mempool::DaMempoolAdapter;
|
use crate::adapters::mempool::DaMempoolAdapter;
|
||||||
use crate::adapters::network::DispersalNetworkAdapter;
|
use crate::adapters::network::DispersalNetworkAdapter;
|
||||||
use crate::backend::DispersalBackend;
|
use crate::backend::DispersalBackend;
|
||||||
use kzgrs_backend::common::blob::DaBlob;
|
use kzgrs_backend::common::build_blob_id;
|
||||||
use kzgrs_backend::common::{build_blob_id, Column, ColumnIndex};
|
|
||||||
use kzgrs_backend::encoder::{DaEncoderParams, EncodedData};
|
use kzgrs_backend::encoder::{DaEncoderParams, EncodedData};
|
||||||
use kzgrs_backend::{dispersal, encoder};
|
use kzgrs_backend::{dispersal, encoder};
|
||||||
use nomos_core::da::{BlobId, DaDispersal, DaEncoder};
|
use nomos_core::da::{BlobId, DaDispersal, DaEncoder};
|
||||||
|
@ -59,14 +57,14 @@ where
|
||||||
&encoded_data.row_commitments,
|
&encoded_data.row_commitments,
|
||||||
);
|
);
|
||||||
|
|
||||||
let reponses_stream = adapter.dispersal_events_stream().await?;
|
let responses_stream = adapter.dispersal_events_stream().await?;
|
||||||
for (subnetwork_id, blob) in encoded_data_to_da_blobs(encoded_data).enumerate() {
|
for (subnetwork_id, blob) in encoded_data.into_iter().enumerate() {
|
||||||
adapter
|
adapter
|
||||||
.disperse((subnetwork_id as u16).into(), blob)
|
.disperse((subnetwork_id as u16).into(), blob)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let valid_responses = reponses_stream
|
let valid_responses = responses_stream
|
||||||
.filter_map(|event| async move {
|
.filter_map(|event| async move {
|
||||||
match event {
|
match event {
|
||||||
Ok((_blob_id, _)) if _blob_id == blob_id => Some(()),
|
Ok((_blob_id, _)) if _blob_id == blob_id => Some(()),
|
||||||
|
@ -157,35 +155,3 @@ where
|
||||||
self.mempool_adapter.post_blob_id(blob_id, metadata).await
|
self.mempool_adapter.post_blob_id(blob_id, metadata).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn encoded_data_to_da_blobs(encoded_data: EncodedData) -> impl Iterator<Item = DaBlob> {
|
|
||||||
let EncodedData {
|
|
||||||
extended_data,
|
|
||||||
row_commitments,
|
|
||||||
rows_proofs,
|
|
||||||
column_commitments,
|
|
||||||
aggregated_column_commitment,
|
|
||||||
aggregated_column_proofs,
|
|
||||||
..
|
|
||||||
} = encoded_data;
|
|
||||||
let iter = izip!(
|
|
||||||
// transpose and unwrap the types as we need to have ownership of it
|
|
||||||
extended_data.transposed().0.into_iter().map(|r| r.0),
|
|
||||||
column_commitments.into_iter(),
|
|
||||||
aggregated_column_proofs.into_iter(),
|
|
||||||
);
|
|
||||||
iter.enumerate().map(
|
|
||||||
move |(column_idx, (column, column_commitment, aggregated_column_proof))| DaBlob {
|
|
||||||
column: Column(column),
|
|
||||||
column_idx: column_idx as ColumnIndex,
|
|
||||||
column_commitment,
|
|
||||||
aggregated_column_commitment,
|
|
||||||
aggregated_column_proof,
|
|
||||||
rows_commitments: row_commitments.clone(),
|
|
||||||
rows_proofs: rows_proofs
|
|
||||||
.iter()
|
|
||||||
.map(|proofs| proofs[column_idx])
|
|
||||||
.collect(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue