Da service backend (#381)

* Add basic da module and traits

* Pipe new blobs and internal message handling

* Add and pipe send attestation method

* Add blob trait

* Make da backend async

* Implement mocka backend

* Bound blob in da backend to blob trait

* Added remove blob

* Rename reply to attestation
This commit is contained in:
Daniel Sanchez 2023-09-08 11:27:52 +02:00 committed by GitHub
parent a79d6c52e6
commit 96e3c2d499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 133 additions and 14 deletions

View File

@ -0,0 +1,13 @@
use bytes::Bytes;
use std::hash::Hash;
pub type BlobHasher<T> = fn(&T) -> <T as Blob>::Hash;
pub trait Blob {
const HASHER: BlobHasher<Self>;
type Hash: Hash + Eq + Clone;
fn hash(&self) -> Self::Hash {
Self::HASHER(self)
}
fn as_bytes(&self) -> Bytes;
}

View File

@ -1,4 +1,5 @@
pub mod account; pub mod account;
pub mod blob;
pub mod block; pub mod block;
pub mod crypto; pub mod crypto;
pub mod fountain; pub mod fountain;

View File

@ -8,6 +8,8 @@ edition = "2021"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
futures = "0.3" futures = "0.3"
moka = { version = "0.11", features = ["future"] }
nomos-core = { path = "../../nomos-core" }
nomos-network = { path = "../network" } nomos-network = { path = "../network" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
tracing = "0.1" tracing = "0.1"

View File

@ -0,0 +1,73 @@
use crate::backend::{DaBackend, DaError};
use moka::future::{Cache, CacheBuilder};
use nomos_core::blob::Blob;
use std::time::Duration;
#[derive(Clone, Copy)]
pub struct BlobCacheSettings {
max_capacity: usize,
evicting_period: Duration,
}
pub struct BlobCache<H, B>(Cache<H, B>);
impl<B> BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
pub fn new(settings: BlobCacheSettings) -> Self {
let BlobCacheSettings {
max_capacity,
evicting_period,
} = settings;
let cache = CacheBuilder::new(max_capacity as u64)
.time_to_live(evicting_period)
// can we leverage this to evict really old blobs?
.time_to_idle(evicting_period)
.build();
Self(cache)
}
pub async fn add(&self, blob: B) {
self.0.insert(blob.hash(), blob).await
}
pub async fn remove(&self, hash: &B::Hash) {
self.0.remove(hash).await;
}
pub fn pending_blobs(&self) -> Box<dyn Iterator<Item = B> + Send> {
// bypass lifetime
let blobs: Vec<_> = self.0.iter().map(|t| t.1).collect();
Box::new(blobs.into_iter())
}
}
#[async_trait::async_trait]
impl<B> DaBackend for BlobCache<B::Hash, B>
where
B: Clone + Blob + Send + Sync + 'static,
B::Hash: Send + Sync + 'static,
{
type Settings = BlobCacheSettings;
type Blob = B;
fn new(settings: Self::Settings) -> Self {
BlobCache::new(settings)
}
async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError> {
self.add(blob).await;
Ok(())
}
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError> {
self.remove(blob).await;
Ok(())
}
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send> {
BlobCache::pending_blobs(self)
}
}

View File

@ -1,3 +1,6 @@
mod memory_cache;
use nomos_core::blob::Blob;
use overwatch_rs::DynError; use overwatch_rs::DynError;
#[derive(Debug)] #[derive(Debug)]
@ -5,14 +8,17 @@ pub enum DaError {
Dyn(DynError), Dyn(DynError),
} }
#[async_trait::async_trait]
pub trait DaBackend { pub trait DaBackend {
type Settings: Clone; type Settings: Clone;
type Blob; type Blob: Blob;
fn new(settings: Self::Settings) -> Self; fn new(settings: Self::Settings) -> Self;
fn add_blob(&mut self, blob: Self::Blob) -> Result<(), DaError>; async fn add_blob(&self, blob: Self::Blob) -> Result<(), DaError>;
async fn remove_blob(&self, blob: &<Self::Blob as Blob>::Hash) -> Result<(), DaError>;
fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>; fn pending_blobs(&self) -> Box<dyn Iterator<Item = Self::Blob> + Send>;
} }

View File

@ -10,6 +10,7 @@ use tokio::sync::oneshot::Sender;
// internal // internal
use crate::backend::{DaBackend, DaError}; use crate::backend::{DaBackend, DaError};
use crate::network::NetworkAdapter; use crate::network::NetworkAdapter;
use nomos_core::blob::Blob;
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::relay::{Relay, RelayMessage};
@ -27,23 +28,29 @@ where
network_relay: Relay<NetworkService<N::Backend>>, network_relay: Relay<NetworkService<N::Backend>>,
} }
pub enum DaMsg<Blob> { pub enum DaMsg<B: Blob> {
PendingBlobs { PendingBlobs {
reply_channel: Sender<Box<dyn Iterator<Item = Blob> + Send>>, reply_channel: Sender<Box<dyn Iterator<Item = B> + Send>>,
},
RemoveBlobs {
blobs: Box<dyn Iterator<Item = <B as Blob>::Hash> + Send>,
}, },
} }
impl<Blob: 'static> Debug for DaMsg<Blob> { impl<B: Blob + 'static> Debug for DaMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self { match self {
DaMsg::PendingBlobs { .. } => { DaMsg::PendingBlobs { .. } => {
write!(f, "DaMsg::PendingBlobs") write!(f, "DaMsg::PendingBlobs")
} }
DaMsg::RemoveBlobs { .. } => {
write!(f, "DaMsg::RemoveBlobs")
}
} }
} }
} }
impl<Blob: 'static> RelayMessage for DaMsg<Blob> {} impl<B: Blob + 'static> RelayMessage for DaMsg<B> {}
impl<B, N> ServiceData for DataAvailabilityService<B, N> impl<B, N> ServiceData for DataAvailabilityService<B, N>
where where
@ -61,11 +68,12 @@ where
#[async_trait::async_trait] #[async_trait::async_trait]
impl<B, N> ServiceCore for DataAvailabilityService<B, N> impl<B, N> ServiceCore for DataAvailabilityService<B, N>
where where
B: DaBackend + Send, B: DaBackend + Send + Sync,
B::Settings: Clone + Send + Sync + 'static, B::Settings: Clone + Send + Sync + 'static,
B::Blob: Send, B::Blob: Send,
<B::Blob as Blob>::Hash: Debug + Send + Sync,
// TODO: Reply type must be piped together, for now empty array. // TODO: Reply type must be piped together, for now empty array.
N: NetworkAdapter<Blob = B::Blob, Reply = [u8; 32]> + Send + Sync, N: NetworkAdapter<Blob = B::Blob, Attestation = [u8; 32]> + Send + Sync,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
@ -109,20 +117,26 @@ where
} }
} }
async fn handle_new_blob<B: DaBackend, A: NetworkAdapter<Blob = B::Blob, Reply = [u8; 32]>>( async fn handle_new_blob<
B: DaBackend,
A: NetworkAdapter<Blob = B::Blob, Attestation = [u8; 32]>,
>(
backend: &mut B, backend: &mut B,
adapter: &A, adapter: &A,
blob: B::Blob, blob: B::Blob,
) -> Result<(), DaError> { ) -> Result<(), DaError> {
// we need to handle the reply (verification + signature) // we need to handle the reply (verification + signature)
backend.add_blob(blob)?; backend.add_blob(blob).await?;
adapter adapter
.send_attestation([0u8; 32]) .send_attestation([0u8; 32])
.await .await
.map_err(DaError::Dyn) .map_err(DaError::Dyn)
} }
async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError> { async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Result<(), DaError>
where
<B::Blob as Blob>::Hash: Debug,
{
match msg { match msg {
DaMsg::PendingBlobs { reply_channel } => { DaMsg::PendingBlobs { reply_channel } => {
let pending_blobs = backend.pending_blobs(); let pending_blobs = backend.pending_blobs();
@ -130,6 +144,16 @@ async fn handle_da_msg<B: DaBackend>(backend: &mut B, msg: DaMsg<B::Blob>) -> Re
tracing::debug!("Could not send pending blobs"); tracing::debug!("Could not send pending blobs");
} }
} }
DaMsg::RemoveBlobs { blobs } => {
let backend = &*backend;
futures::stream::iter(blobs)
.for_each_concurrent(None, |blob| async move {
if let Err(e) = backend.remove_blob(&blob).await {
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
}
})
.await;
}
} }
Ok(()) Ok(())
} }

View File

@ -1,19 +1,19 @@
// std // std
// crates // crates
use futures::Stream; use futures::Stream;
use overwatch_rs::DynError;
// internal // internal
use nomos_network::backends::NetworkBackend; use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData; use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait NetworkAdapter { pub trait NetworkAdapter {
type Backend: NetworkBackend + 'static; type Backend: NetworkBackend + 'static;
type Blob: Send + Sync + 'static; type Blob: Send + Sync + 'static;
type Reply: Send + Sync + 'static; type Attestation: Send + Sync + 'static;
async fn new( async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>, network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
@ -21,5 +21,5 @@ pub trait NetworkAdapter {
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>; async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
async fn send_attestation(&self, attestation: Self::Reply) -> Result<(), DynError>; async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;
} }