1
0
mirror of synced 2025-01-25 15:09:07 +00:00

Add API to return DA blobs (#453)

* Add API to return DA blobs

* remove pending blobs API
This commit is contained in:
Giacomo Pasini 2023-10-09 10:53:01 +02:00 committed by GitHub
parent f04c4a6492
commit 2016f75213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 21 deletions

View File

@ -37,12 +37,6 @@ where
pub async fn remove(&self, hash: &B::Hash) {
self.0.remove(hash).await;
}
pub fn pending_blobs(&self) -> Box<dyn Iterator<Item = B> + Send> {
// bypass lifetime
let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect();
Box::new(blobs.into_iter())
}
}
#[async_trait::async_trait]
@ -68,7 +62,7 @@ where
Ok(())
}
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send> {
BlobCache::pending_blobs(self)
fn get_blob(&self, id: &<Self::Blob as Blob>::Hash) -> Option<Self::Blob> {
self.0.get(id)
}
}

View File

@ -20,5 +20,5 @@ pub trait DaBackend {
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError>;
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
fn get_blob(&self, id: &<Self::Blob as Blob>::Hash) -> Option<Self::Blob>;
}

View File

@ -3,6 +3,7 @@ pub mod network;
// std
use overwatch_rs::DynError;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
// crates
use futures::StreamExt;
@ -32,23 +33,24 @@ where
}
pub enum DaMsg<B: Blob> {
PendingBlobs {
reply_channel: Sender<Box<dyn Iterator<Item = B> + Send>>,
},
RemoveBlobs {
blobs: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
},
Get {
ids: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
reply_channel: Sender<HashMap<<B as Blob>::Hash, B>>,
},
}
impl<B: Blob + 'static> Debug for DaMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DaMsg::PendingBlobs { .. } => {
write!(f, "DaMsg::PendingBlobs")
}
DaMsg::RemoveBlobs { .. } => {
write!(f, "DaMsg::RemoveBlobs")
}
DaMsg::Get { .. } => {
write!(f, "DaMsg::Get")
}
}
}
}
@ -156,12 +158,6 @@ where
<B::Blob as Blob>::Hash: Debug,
{
match msg {
DaMsg::PendingBlobs { reply_channel } => {
let pending_blobs = backend.pending_blobs();
if reply_channel.send(pending_blobs).is_err() {
tracing::debug!("Could not send pending blobs");
}
}
DaMsg::RemoveBlobs { blobs } => {
futures::stream::iter(blobs)
.for_each_concurrent(None, |blob| async move {
@ -171,6 +167,14 @@ where
})
.await;
}
DaMsg::Get { ids, reply_channel } => {
let res = ids
.filter_map(|id| backend.get_blob(&id).map(|blob| (id, blob)))
.collect();
if reply_channel.send(res).is_err() {
tracing::error!("Could not returns blobs");
}
}
}
Ok(())
}