first attempt at storage relay...
This commit is contained in:
parent
b105b2541c
commit
cd327d69ae
|
@ -13,6 +13,7 @@ libp2p-identity = { version = "0.2" }
|
|||
nomos-core = { path = "../../../nomos-core" }
|
||||
nomos-da-network-core = { path = "../../../nomos-da/network/core" }
|
||||
nomos-da-network-service = { path = "../../../nomos-services/data-availability/network" }
|
||||
nomos-da-verifier = { path = "../verifier" }
|
||||
nomos-storage = { path = "../../../nomos-services/storage" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
pub mod backend;
|
||||
pub mod network;
|
||||
pub mod storage;
|
||||
|
||||
// std
|
||||
use std::collections::BTreeSet;
|
||||
|
@ -7,23 +8,26 @@ use std::fmt::Debug;
|
|||
|
||||
// crates
|
||||
use rand::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{error, span, Instrument, Level};
|
||||
// internal
|
||||
use backend::{DaSamplingServiceBackend, SamplingState};
|
||||
use kzgrs_backend::common::blob::DaBlob;
|
||||
use network::NetworkAdapter;
|
||||
use nomos_core::da::blob::Blob;
|
||||
use nomos_core::da::BlobId;
|
||||
use nomos_da_network_service::backends::libp2p::validator::SamplingEvent;
|
||||
use nomos_da_network_service::NetworkService;
|
||||
use nomos_storage::StorageService;
|
||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||
use overwatch_rs::DynError;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot;
|
||||
use storage::DaStorageAdapter;
|
||||
|
||||
const DA_SAMPLING_TAG: ServiceId = "DA-Sampling";
|
||||
|
||||
|
@ -40,15 +44,49 @@ pub enum DaSamplingServiceMsg<BlobId> {
|
|||
},
|
||||
}
|
||||
|
||||
// TODO!!!
|
||||
// According to the spec, there are different error types which can be returned,
|
||||
// but this is based on the NomosDA Cryptographic Protocol spec saying
|
||||
// that a sampling call is made with a row and col variable,
|
||||
// so the Nomos Network Specification in turn says that three diff errors can
|
||||
// be returned: ColNotFound, RowNotFound, and ColAndRowNotFound
|
||||
#[derive(Debug)]
|
||||
pub enum DaSampleError {
|
||||
BlobIdNotFound,
|
||||
}
|
||||
|
||||
// TODO!!!
|
||||
// Ugly hack to make the two blob id types compatible...
|
||||
// Why can we not use the same ID objects resp traits?
|
||||
// Do we really have to have associated types on all these services for
|
||||
// something simple as a Blob ID?
|
||||
pub struct CompatibilityBlob {
|
||||
pub id: [u8; 32],
|
||||
}
|
||||
|
||||
impl Blob for CompatibilityBlob {
|
||||
type BlobId = Blob::BlobId;
|
||||
type ColumnIndex = [u8; 2];
|
||||
|
||||
fn id(&self) -> Self::BlobId {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn column_idx(&self) -> Self::ColumnIndex {
|
||||
[0; 2]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings> {
|
||||
pub struct DaSamplingServiceSettings<BackendSettings, NetworkSettings, StorageSettings> {
|
||||
pub sampling_settings: BackendSettings,
|
||||
pub network_adapter_settings: NetworkSettings,
|
||||
pub storage_adapter_settings: StorageSettings,
|
||||
}
|
||||
|
||||
impl<B: 'static> RelayMessage for DaSamplingServiceMsg<B> {}
|
||||
|
||||
pub struct DaSamplingService<Backend, N, R>
|
||||
pub struct DaSamplingService<Backend, N, R, S>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R> + Send,
|
||||
|
@ -57,19 +95,22 @@ where
|
|||
Backend::BlobId: Debug + 'static,
|
||||
N: NetworkAdapter,
|
||||
N::Settings: Clone,
|
||||
S: DaStorageAdapter,
|
||||
{
|
||||
network_relay: Relay<NetworkService<N::Backend>>,
|
||||
storage_relay: Relay<StorageService<S::Backend>>,
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
sampler: Backend,
|
||||
}
|
||||
|
||||
impl<Backend, N, R> DaSamplingService<Backend, N, R>
|
||||
impl<Backend, N, R, S> DaSamplingService<Backend, N, R, S>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R, BlobId = BlobId, Blob = DaBlob> + Send + 'static,
|
||||
Backend::Settings: Clone,
|
||||
N: NetworkAdapter + Send + 'static,
|
||||
N::Settings: Clone,
|
||||
S: DaStorageAdapter,
|
||||
{
|
||||
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||
match message {
|
||||
|
@ -117,7 +158,11 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_sampling_message(event: SamplingEvent, sampler: &mut Backend) {
|
||||
async fn handle_sampling_message(
|
||||
event: SamplingEvent,
|
||||
sampler: &mut Backend,
|
||||
storage_adapter: &mut S,
|
||||
) {
|
||||
match event {
|
||||
SamplingEvent::SamplingSuccess { blob_id, blob } => {
|
||||
sampler.handle_sampling_success(blob_id, *blob).await;
|
||||
|
@ -129,15 +174,41 @@ where
|
|||
}
|
||||
error!("Error while sampling: {error}");
|
||||
}
|
||||
// TODO!!!
|
||||
// Are we really following the spec here?
|
||||
// According to the NomosDA Cryptographic Protocol,
|
||||
// a sampling client samples with col and row info,
|
||||
// and the Nomos Network Specification adopted that same premise.
|
||||
// But it rather looks like as we just sample with blob ID.
|
||||
// What is wrong, the spec(s) or the code?
|
||||
SamplingEvent::SamplingRequest {
|
||||
blob_id,
|
||||
response_sender,
|
||||
} => todo!(),
|
||||
} => {
|
||||
// TODO!!!
|
||||
// Ugly hack to make the two blob id types compatible...
|
||||
// Why can we not use the same ID objects resp traits?
|
||||
// Do we really have to have associated types on all these services for
|
||||
// something simple as a Blob ID?
|
||||
let cb = CompatibilityBlob {
|
||||
id: blob_id.clone(),
|
||||
};
|
||||
let id = cb.id();
|
||||
let dablob = storage_adapter.get_blob(id).await;
|
||||
if dablob.is_ok() {
|
||||
// this does also not compile...
|
||||
response_sender.send(dablob.unwrap().unwrap());
|
||||
} else {
|
||||
// TODO!!!
|
||||
// This is most probably not handled yet on the caller side!
|
||||
//response_sender.send(DaSampleError::BlobIdNotFound);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Backend, N, R> ServiceData for DaSamplingService<Backend, N, R>
|
||||
impl<Backend, N, R, S> ServiceData for DaSamplingService<Backend, N, R, S>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R> + Send,
|
||||
|
@ -146,22 +217,25 @@ where
|
|||
Backend::BlobId: Debug + 'static,
|
||||
N: NetworkAdapter,
|
||||
N::Settings: Clone,
|
||||
S: DaStorageAdapter,
|
||||
S::Settings: Send,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = DA_SAMPLING_TAG;
|
||||
type Settings = DaSamplingServiceSettings<Backend::Settings, N::Settings>;
|
||||
type Settings = DaSamplingServiceSettings<Backend::Settings, N::Settings, S::Settings>;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = DaSamplingServiceMsg<Backend::BlobId>;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Backend, N, R> ServiceCore for DaSamplingService<Backend, N, R>
|
||||
impl<Backend, N, R, S> ServiceCore for DaSamplingService<Backend, N, R, S>
|
||||
where
|
||||
R: SeedableRng + RngCore,
|
||||
Backend: DaSamplingServiceBackend<R, BlobId = BlobId, Blob = DaBlob> + Send + Sync + 'static,
|
||||
Backend::Settings: Clone + Send + Sync + 'static,
|
||||
N: NetworkAdapter + Send + Sync + 'static,
|
||||
N::Settings: Clone + Send + Sync + 'static,
|
||||
S: DaStorageAdapter,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||
let DaSamplingServiceSettings {
|
||||
|
@ -169,10 +243,12 @@ where
|
|||
} = service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let network_relay = service_state.overwatch_handle.relay();
|
||||
let storage_relay = service_state.overwatch_handle.relay();
|
||||
let rng = R::from_entropy();
|
||||
|
||||
Ok(Self {
|
||||
network_relay,
|
||||
storage_relay,
|
||||
service_state,
|
||||
sampler: Backend::new(sampling_settings, rng),
|
||||
})
|
||||
|
@ -185,13 +261,21 @@ where
|
|||
// position of his bls public key landing in the above-mentioned list.
|
||||
let Self {
|
||||
network_relay,
|
||||
mut storage_relay,
|
||||
mut service_state,
|
||||
mut sampler,
|
||||
} = self;
|
||||
let DaSamplingServiceSettings { .. } = service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let DaSamplingServiceSettings {
|
||||
storage_adapter_settings,
|
||||
..
|
||||
} = service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let network_relay = network_relay.connect().await?;
|
||||
let storage_relay = storage_relay.connect().await?;
|
||||
|
||||
let mut network_adapter = N::new(network_relay).await;
|
||||
let mut storage_adapter = S::new(storage_adapter_settings, storage_relay).await;
|
||||
|
||||
let mut sampling_message_stream = network_adapter.listen_to_sampling_messages().await?;
|
||||
let mut next_prune_tick = sampler.prune_interval();
|
||||
|
@ -204,7 +288,7 @@ where
|
|||
Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await;
|
||||
}
|
||||
Some(sampling_message) = sampling_message_stream.next() => {
|
||||
Self::handle_sampling_message(sampling_message, &mut sampler).await;
|
||||
Self::handle_sampling_message(sampling_message, &mut sampler, &mut storage_adapter).await;
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
#[cfg(feature = "rocksdb-backend")]
|
||||
pub mod rocksdb;
|
|
@ -0,0 +1,84 @@
|
|||
// std
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{marker::PhantomData, path::PathBuf};
|
||||
// crates
|
||||
use nomos_core::da::blob::Blob;
|
||||
use nomos_da_storage::{
|
||||
fs::write_blob,
|
||||
rocksdb::{key_bytes, DA_VERIFIED_KEY_PREFIX},
|
||||
};
|
||||
use nomos_storage::{
|
||||
backends::{rocksdb::RocksBackend, StorageSerde},
|
||||
StorageMsg, StorageService,
|
||||
};
|
||||
use overwatch_rs::{
|
||||
services::{relay::OutboundRelay, ServiceData},
|
||||
DynError,
|
||||
};
|
||||
// internal
|
||||
use crate::storage::DaStorageAdapter;
|
||||
|
||||
pub struct RocksAdapter<B, S>
|
||||
where
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
settings: RocksAdapterSettings,
|
||||
storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
|
||||
_blob: PhantomData<B>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<A, B, S> DaStorageAdapter for RocksAdapter<A, B, S>
|
||||
where
|
||||
A: Serialize + DeserializeOwned + Clone + Send + Sync,
|
||||
B: Blob + Serialize + Clone + Send + Sync + 'static,
|
||||
B::BlobId: AsRef<[u8]> + Send + Sync + 'static,
|
||||
B::ColumnIndex: AsRef<[u8]> + Send + Sync + 'static,
|
||||
S: StorageSerde + Send + Sync + 'static,
|
||||
{
|
||||
type Backend = RocksBackend<S>;
|
||||
type Blob = B;
|
||||
type Settings = RocksAdapterSettings;
|
||||
|
||||
async fn new(
|
||||
settings: Self::Settings,
|
||||
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self {
|
||||
Self {
|
||||
settings,
|
||||
storage_relay,
|
||||
_blob: PhantomData,
|
||||
_attestation: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_blob(
|
||||
&self,
|
||||
blob_idx: <Self::Blob as Blob>::BlobId,
|
||||
) -> Result<Option<Self::Blob>, DynError> {
|
||||
let key = key_bytes(DA_VERIFIED_KEY_PREFIX, blob_idx);
|
||||
let (reply_channel, reply_rx) = tokio::sync::oneshot::channel();
|
||||
self.storage_relay
|
||||
.send(StorageMsg::Load { key, reply_channel })
|
||||
.await
|
||||
.expect("Failed to send load request to storage relay");
|
||||
|
||||
// TODO: Use storage backend ser/de functionality.
|
||||
//
|
||||
// Storage backend already handles ser/de, but lacks the ability to seperate storage
|
||||
// domains using prefixed keys. Once implemented Indexer and Verifier can be simplified.
|
||||
reply_rx
|
||||
.await
|
||||
.map(|maybe_bytes| {
|
||||
maybe_bytes.map(|bytes| {
|
||||
S::deserialize(bytes).expect("Attestation should be deserialized from bytes")
|
||||
})
|
||||
})
|
||||
.map_err(|_| "".into())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RocksAdapterSettings {
|
||||
pub blob_storage_directory: PathBuf,
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
pub mod adapters;
|
||||
|
||||
use nomos_core::da::blob::Blob;
|
||||
use nomos_storage::{backends::StorageBackend, StorageService};
|
||||
use overwatch_rs::{
|
||||
services::{relay::OutboundRelay, ServiceData},
|
||||
DynError,
|
||||
};
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait DaStorageAdapter {
|
||||
type Backend: StorageBackend + Send + Sync + 'static;
|
||||
type Settings: Clone + Send + Sync + 'static;
|
||||
type Blob: Blob + Clone;
|
||||
|
||||
async fn new(
|
||||
settings: Self::Settings,
|
||||
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self;
|
||||
|
||||
async fn get_blob(
|
||||
&self,
|
||||
blob_id: <Self::Blob as Blob>::BlobId,
|
||||
) -> Result<Option<Self::Blob>, DynError>;
|
||||
}
|
Loading…
Reference in New Issue