DA: Implement base structure for verifier service (#627)

* Base cleaning of da to new traits/structure
Added new da protocols and types

* Implement base structure for verifier service

* Added comments and todo!

* Cleanup imports

* Size of VidCert in full replication

* Nomos Da Verifier service crate

* Extension replaced with metadata

* Fix DaIndexer service name

* Storage adapter trait in verifier

* Manage lifecycle and messages in verifier

* Blob trait in core

* Common nomos da storage crate

* Use updated nomos da storage in indexer

* Verifier storage adapter

* Libp2p adaper for verifier

* Kzgrs backend in verifier service

* Fix fmt

* Clippy happy

---------

Co-authored-by: Gusto <bacvinka@gmail.com>
This commit is contained in:
Daniel Sanchez 2024-05-20 12:15:02 +02:00 committed by GitHub
parent c7ffe8a3b8
commit 36e1826b86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 747 additions and 186 deletions

View File

@ -1,6 +1,9 @@
[workspace]
members = [
"nomos-core",
"nomos-da/kzgrs",
"nomos-da/kzgrs-backend",
"nomos-da/storage",
"nomos-libp2p",
"nomos-services/api",
"nomos-services/log",
@ -11,6 +14,7 @@ members = [
"nomos-services/metrics",
"nomos-services/system-sig",
"nomos-services/data-availability/indexer",
"nomos-services/data-availability/verifier",
"nomos-da/full-replication",
# TODO: add it again and reimplement full replication
# "nomos-cli",
@ -20,8 +24,6 @@ members = [
"consensus/carnot-engine",
"consensus/cryptarchia-engine",
"ledger/cryptarchia-ledger",
"tests",
"nomos-da/kzgrs",
"nomos-da/kzgrs-backend",
"tests"
]
resolver = "2"

View File

@ -47,7 +47,11 @@ pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, VidCertificate>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, VidCertificate, <VidCertificate as certificate::vid::VidCertificate>::CertificateId>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolVerificationProvider,
FillSizeWithTx<MB16, Tx>,
@ -62,7 +66,11 @@ pub type TxMempool = TxMempoolService<
pub type DaMempool = DaMempoolService<
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MockPool<HeaderId, VidCertificate, <VidCertificate as certificate::vid::VidCertificate>::CertificateId>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolVerificationProvider,
>;

View File

@ -0,0 +1,7 @@
pub trait Blob {
type BlobId;
type ColumnId;
fn id(&self) -> Self::BlobId;
fn column_id(&self) -> Self::ColumnId;
}

View File

@ -21,6 +21,7 @@ impl<const SIZE: usize, B> FillSize<SIZE, B> {
impl<const SIZE: usize, C: VidCertificate> BlobCertificateSelect for FillSize<SIZE, C> {
type Certificate = C;
type Settings = ();
fn new(_settings: Self::Settings) -> Self {
@ -31,6 +32,9 @@ impl<const SIZE: usize, C: VidCertificate> BlobCertificateSelect for FillSize<SI
&self,
certificates: I,
) -> impl Iterator<Item = Self::Certificate> + 'i {
utils::select::select_from_till_fill_size::<SIZE, Self::Certificate>(|_| SIZE, certificates)
utils::select::select_from_till_fill_size::<SIZE, Self::Certificate>(
|c| c.size(),
certificates,
)
}
}

View File

@ -4,4 +4,5 @@ pub trait VidCertificate: Metadata {
type CertificateId;
fn certificate_id(&self) -> Self::CertificateId;
fn size(&self) -> usize;
}

View File

@ -3,6 +3,7 @@ use std::error::Error;
// internal
pub mod attestation;
pub mod blob;
pub mod certificate;
pub trait DaEncoder {
@ -13,7 +14,9 @@ pub trait DaEncoder {
pub trait DaVerifier {
type DaBlob;
type Attestation;
fn verify(&self, blob: Self::DaBlob) -> Result<Self::Attestation, impl Error>;
type Error;
fn verify(&self, blob: &Self::DaBlob) -> Result<Self::Attestation, Self::Error>;
}
pub trait DaDispersal {

View File

@ -27,27 +27,6 @@ pub mod openapi {
pub use super::Certificate;
}
#[derive(Debug, Clone)]
pub struct FullReplication<CertificateStrategy> {
voter: Voter,
certificate_strategy: CertificateStrategy,
output_buffer: Vec<Bytes>,
attestations: Vec<Attestation>,
output_certificate_buf: Vec<Certificate>,
}
impl<S> FullReplication<S> {
pub fn new(voter: Voter, strategy: S) -> Self {
Self {
voter,
certificate_strategy: strategy,
output_buffer: Vec::new(),
attestations: Vec::new(),
output_certificate_buf: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct AbsoluteNumber<A, C> {
num_attestations: usize,
@ -114,6 +93,12 @@ pub struct Metadata {
index: Index,
}
impl Metadata {
fn size(&self) -> usize {
std::mem::size_of_val(&self.app_id) + std::mem::size_of_val(&self.index)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub struct Certificate {
@ -179,6 +164,10 @@ impl certificate::vid::VidCertificate for VidCertificate {
fn certificate_id(&self) -> Self::CertificateId {
self.id
}
fn size(&self) -> usize {
std::mem::size_of_val(&self.id) + self.metadata.size()
}
}
impl metadata::Metadata for VidCertificate {
@ -192,7 +181,9 @@ impl metadata::Metadata for VidCertificate {
impl Hash for VidCertificate {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(<VidCertificate as certificate::vid::VidCertificate>::certificate_id(self).as_ref());
state.write(
<VidCertificate as certificate::vid::VidCertificate>::certificate_id(self).as_ref(),
);
}
}

View File

@ -11,6 +11,7 @@ use kzgrs::Commitment;
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct Chunk(pub Vec<u8>);
pub struct Row(pub Vec<Chunk>);
#[derive(Clone)]
pub struct Column(pub Vec<Chunk>);
pub struct ChunksMatrix(pub Vec<Row>);
@ -18,10 +19,12 @@ impl Chunk {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn as_bytes(&self) -> Vec<u8> {
self.0.to_vec()
}
pub const fn empty() -> Self {
Self(vec![])
}
@ -40,6 +43,9 @@ impl Row {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn as_bytes(&self) -> Vec<u8> {
self.0.iter().flat_map(Chunk::as_bytes).collect()
}
@ -54,6 +60,9 @@ impl Column {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn as_bytes(&self) -> Vec<u8> {
self.0.iter().flat_map(Chunk::as_bytes).collect()
}
@ -87,6 +96,9 @@ impl ChunksMatrix {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn rows(&self) -> impl Iterator<Item = &Row> + '_ {
self.0.iter()
}

View File

@ -19,7 +19,7 @@ pub struct DaEncoderParams {
impl DaEncoderParams {
pub const MAX_BLS12_381_ENCODING_CHUNK_SIZE: usize = 31;
const fn default_with(column_count: usize) -> Self {
pub const fn default_with(column_count: usize) -> Self {
Self { column_count }
}
}

View File

@ -1,4 +1,4 @@
mod common;
mod encoder;
mod global;
mod verifier;
pub mod common;
pub mod encoder;
pub mod global;
pub mod verifier;

View File

@ -1,9 +1,8 @@
// std
// crates
use blst::min_sig::{PublicKey, SecretKey, Signature};
use blst::min_sig::{PublicKey, SecretKey};
use itertools::{izip, Itertools};
use num_bigint::BigUint;
use sha3::{Digest, Sha3_256};
// internal
@ -14,10 +13,11 @@ use crate::encoder::DaEncoderParams;
use crate::global::{DOMAIN, GLOBAL_PARAMETERS};
use kzgrs::common::field_element_from_bytes_le;
use kzgrs::{
bytes_to_polynomial, commit_polynomial, verify_element_proof, Commitment, FieldElement, Proof,
bytes_to_polynomial, commit_polynomial, verify_element_proof, Commitment, Proof,
BYTES_PER_FIELD_ELEMENT,
};
#[derive(Clone)]
pub struct DaBlob {
column: Column,
column_commitment: Commitment,
@ -163,10 +163,10 @@ impl DaVerifier {
mod test {
use crate::common::{hash_column_and_commitment, Chunk, Column};
use crate::encoder::test::{rand_data, ENCODER};
use crate::encoder::{DaEncoder, DaEncoderParams};
use crate::encoder::DaEncoderParams;
use crate::global::{DOMAIN, GLOBAL_PARAMETERS};
use crate::verifier::{DaBlob, DaVerifier};
use blst::min_sig::{PublicKey, SecretKey};
use blst::min_sig::SecretKey;
use kzgrs::{
bytes_to_polynomial, commit_polynomial, generate_element_proof, BYTES_PER_FIELD_ELEMENT,
};
@ -231,9 +231,9 @@ mod test {
let verifier = &verifiers[i];
let da_blob = DaBlob {
column,
column_commitment: encoded_data.column_commitments[i].clone(),
aggregated_column_commitment: encoded_data.aggregated_column_commitment.clone(),
aggregated_column_proof: encoded_data.aggregated_column_proofs[i].clone(),
column_commitment: encoded_data.column_commitments[i],
aggregated_column_commitment: encoded_data.aggregated_column_commitment,
aggregated_column_proof: encoded_data.aggregated_column_proofs[i],
rows_commitments: encoded_data.row_commitments.clone(),
rows_proofs: encoded_data
.rows_proofs

View File

@ -0,0 +1,10 @@
[package]
name = "nomos-da-storage"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.2"
hex = "0.4.3"
tokio = { version = "1", features = ["fs", "io-util"] }
tracing = "0.1"

View File

@ -0,0 +1,60 @@
// std
use std::path::PathBuf;
// crates
use bytes::Bytes;
use tokio::{
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncWriteExt},
};
// internal
// TODO: Rocksdb has a feature called BlobDB that handles largo blob storing, but further
// investigation needs to be done to see if rust wrapper supports it.
pub async fn load_blob(base_dir: PathBuf, blob_id: &[u8]) -> Option<Bytes> {
let blob_id = hex::encode(blob_id);
let mut path = base_dir;
path.push(blob_id);
let mut file = match File::open(path).await {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to open file: {}", e);
return None;
}
};
let mut contents = vec![];
if let Err(e) = file.read_to_end(&mut contents).await {
tracing::error!("Failed to read file: {}", e);
return None;
}
Some(Bytes::from(contents))
}
pub async fn write_blob(
base_dir: PathBuf,
blob_id: &[u8],
data: &[u8],
) -> Result<(), std::io::Error> {
let blob_id = hex::encode(blob_id);
let mut path = base_dir;
path.push(blob_id);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let mut file = OpenOptions::new()
.write(true)
.create(true)
// In an unlikely scenario when a file already exists, rewrite the
// contents.
.truncate(true)
.open(path)
.await?;
file.write_all(data).await
}

View File

@ -0,0 +1,2 @@
pub mod fs;
pub mod rocksdb;

View File

@ -0,0 +1,13 @@
use bytes::{Bytes, BytesMut};
pub const DA_VID_KEY_PREFIX: &str = "da/vid/";
pub const DA_ATTESTED_KEY_PREFIX: &str = "da/attested/";
pub fn key_bytes(prefix: &str, id: impl AsRef<[u8]>) -> Bytes {
let mut buffer = BytesMut::new();
buffer.extend_from_slice(prefix.as_bytes());
buffer.extend_from_slice(id.as_ref());
buffer.freeze()
}

View File

@ -26,7 +26,11 @@ pub type Cryptarchia<Tx, SS, const SIZE: usize> = CryptarchiaConsensus<
ConsensusNetworkAdapter<Tx, Certificate>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, VidCertificate, <VidCertificate as certificate::vid::VidCertificate>::CertificateId>,
MockPool<
HeaderId,
VidCertificate,
<VidCertificate as certificate::vid::VidCertificate>::CertificateId,
>,
MempoolNetworkAdapter<Certificate, <Certificate as certificate::Certificate>::Id>,
MempoolVerificationProvider,
FillSizeWithTx<SIZE, Tx>,

View File

@ -1,74 +0,0 @@
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication};
use nomos_core::da::blob;
use nomos_core::header::HeaderId;
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaNetworkAdapter,
DaMsg, DataAvailabilityService,
};
use nomos_mempool::da::service::DaMempoolService;
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
tx::service::openapi::Status,
};
use nomos_mempool::{MempoolMetrics, MempoolMsg};
use tokio::sync::oneshot;
pub type MempoolServiceDa = DaMempoolService<
MempoolNetworkAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<HeaderId, Certificate, <Blob as blob::Blob>::Hash>,
>;
pub type DataAvailability = DataAvailabilityService<
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
BlobCache<<Blob as nomos_core::da::blob::Blob>::Hash, Blob>,
DaNetworkAdapter<Blob, Attestation>,
>;
pub async fn da_mempool_metrics(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
) -> Result<MempoolMetrics, super::DynError> {
let relay = handle.relay::<MempoolServiceDa>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Metrics {
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await.unwrap())
}
pub async fn da_mempool_status(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Status<HeaderId>>, super::DynError> {
let relay = handle.relay::<MempoolServiceDa>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay
.send(MempoolMsg::Status {
items,
reply_channel: sender,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await.unwrap())
}
pub async fn da_blobs(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
ids: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Blob>, super::DynError> {
let relay = handle.relay::<DataAvailability>().connect().await?;
let (reply_channel, receiver) = oneshot::channel();
relay
.send(DaMsg::Get {
ids: Box::new(ids.into_iter()),
reply_channel,
})
.await
.map_err(|(e, _)| e)?;
Ok(receiver.await?)
}

View File

@ -40,6 +40,8 @@ use tokio::sync::{broadcast, oneshot};
use tokio_stream::wrappers::IntervalStream;
use tracing::{error, instrument};
type MempoolRelay<Payload, Item, Key> = OutboundRelay<MempoolMsg<HeaderId, Payload, Item, Key>>;
// Limit the number of blocks returned by GetHeaders
const HEADERS_LIMIT: usize = 512;
@ -565,7 +567,12 @@ where
)
.await;
mark_in_block(da_mempool_relay, block.blobs().map(VidCertificate::certificate_id), id).await;
mark_in_block(
da_mempool_relay,
block.blobs().map(VidCertificate::certificate_id),
id,
)
.await;
// store block
let msg = <StorageMsg<_>>::new_store_message(header.id(), block.clone());
@ -598,12 +605,8 @@ where
proof: LeaderProof,
tx_selector: TxS,
blob_selector: BS,
cl_mempool_relay: OutboundRelay<
MempoolMsg<HeaderId, ClPool::Item, ClPool::Item, ClPool::Key>,
>,
da_mempool_relay: OutboundRelay<
MempoolMsg<HeaderId, DaPoolAdapter::Payload, DaPool::Item, DaPool::Key>,
>,
cl_mempool_relay: MempoolRelay<ClPool::Item, ClPool::Item, ClPool::Key>,
da_mempool_relay: MempoolRelay<DaPoolAdapter::Payload, DaPool::Item, DaPool::Key>,
) -> Option<Block<ClPool::Item, DaPool::Item>> {
let mut output = None;
let cl_txs = get_mempool_contents(cl_mempool_relay);

View File

@ -9,14 +9,14 @@ edition = "2021"
async-trait = "0.1"
bytes = "1.2"
futures = "0.3"
hex = "0.4.3"
nomos-core = { path = "../../../nomos-core" }
nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-storage = { path = "../../../nomos-services/storage" }
nomos-mempool = { path = "../../../nomos-services/mempool" }
cryptarchia-consensus = { path = "../../../nomos-services/cryptarchia-consensus" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
tokio = { version = "1", features = ["sync", "fs", "io-util"] }
tokio = { version = "1", features = ["sync"] }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"
tokio-stream = "0.1.15"

View File

@ -183,7 +183,7 @@ where
DaStorage: DaStorageAdapter<VID = DaPool::Item, Blob = B>,
ConsensusStorage: StorageBackend + Send + Sync + 'static,
{
const SERVICE_ID: ServiceId = "DaStorage";
const SERVICE_ID: ServiceId = "DaIndexer";
type Settings = IndexerSettings<DaStorage::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;

View File

@ -1,12 +1,14 @@
use std::path::PathBuf;
use std::{marker::PhantomData, ops::Range};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::{stream::FuturesUnordered, Stream};
use nomos_core::da::certificate::{
metadata::{Metadata, Next},
vid::VidCertificate,
};
use nomos_da_storage::fs::load_blob;
use nomos_da_storage::rocksdb::{key_bytes, DA_ATTESTED_KEY_PREFIX, DA_VID_KEY_PREFIX};
use nomos_storage::{
backends::{rocksdb::RocksBackend, StorageSerde},
StorageMsg, StorageService,
@ -15,13 +17,9 @@ use overwatch_rs::{
services::{relay::OutboundRelay, ServiceData},
DynError,
};
use tokio::{fs::File, io::AsyncReadExt};
use crate::storage::DaStorageAdapter;
const DA_VID_KEY_PREFIX: &str = "da/vid/";
const DA_ATTESTED_KEY_PREFIX: &str = "da/attested/";
pub struct RocksAdapter<S, V>
where
S: StorageSerde + Send + Sync + 'static,
@ -65,7 +63,7 @@ where
// Remove item from attested list as it shouldn't be used again.
self.storage_relay
.send(StorageMsg::Remove {
.send(StorageMsg::Load {
key: attested_key,
reply_channel: reply_tx,
})
@ -129,10 +127,7 @@ where
futures.push(async move {
match reply_rx.await {
Ok(Some(id)) => (
idx,
load_blob(settings.blob_storage_directory, app_id.as_ref(), &id).await,
),
Ok(Some(id)) => (idx, load_blob(settings.blob_storage_directory, &id).await),
Ok(None) => (idx, None),
Err(_) => {
tracing::error!("Failed to receive storage response");
@ -148,42 +143,6 @@ where
}
}
fn key_bytes(prefix: &str, id: impl AsRef<[u8]>) -> Bytes {
let mut buffer = BytesMut::new();
buffer.extend_from_slice(prefix.as_bytes());
buffer.extend_from_slice(id.as_ref());
buffer.freeze()
}
// TODO: Rocksdb has a feature called BlobDB that handles largo blob storing, but further
// investigation needs to be done to see if rust wrapper supports it.
async fn load_blob(base_dir: PathBuf, app_id: &[u8], id: &[u8]) -> Option<Bytes> {
let app_id = hex::encode(app_id);
let id = hex::encode(id);
let mut path = base_dir;
path.push(app_id);
path.push(id);
let mut file = match File::open(path).await {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to open file: {}", e);
return None;
}
};
let mut contents = vec![];
if let Err(e) = file.read_to_end(&mut contents).await {
tracing::error!("Failed to read file: {}", e);
return None;
}
Some(Bytes::from(contents))
}
#[derive(Debug, Clone)]
pub struct RocksAdapterSettings {
pub blob_storage_directory: PathBuf,

View File

@ -1,5 +1,3 @@
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
@ -17,6 +15,7 @@ use full_replication::{Certificate, VidCertificate};
use nomos_core::da::certificate::vid::VidCertificate as _;
use nomos_core::da::certificate::CertificateStrategy;
use nomos_core::{da::certificate, header::HeaderId, tx::Transaction};
use nomos_da_storage::fs::write_blob;
use nomos_libp2p::{Multiaddr, Swarm, SwarmConfig};
use nomos_mempool::da::verify::fullreplication::DaVerificationProvider as MempoolVerificationProvider;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
@ -252,16 +251,13 @@ fn test_indexer() {
let range = 0.into()..1.into(); // get idx 0 and 1.
// Mock attestation step where blob is persisted in nodes blob storage.
let app_id_hex = hex::encode(app_id);
let id_hex = hex::encode(vid.certificate_id());
let mut path = blobs_dir;
path.push(app_id_hex);
std::fs::create_dir_all(&path).unwrap(); // app_id as dir
path.push(id_hex);
let mut file = File::create(path).unwrap();
file.write_all(b"blob").unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(write_blob(
blobs_dir,
vid.certificate_id().as_ref(),
b"blob",
))
.unwrap();
node1.spawn(async move {
let mempool_outbound = mempool.connect().await.unwrap();

View File

@ -0,0 +1,25 @@
[package]
name = "nomos-da-verifier"
version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
blst = "0.3.11"
bytes = "1.2"
futures = "0.3"
kzgrs-backend = { path = "../../../nomos-da/kzgrs-backend" }
nomos-core = { path = "../../../nomos-core" }
nomos-da-storage = { path = "../../../nomos-da/storage" }
nomos-network = { path = "../../../nomos-services/network" }
nomos-storage = { path = "../../../nomos-services/storage" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["sync", "macros"] }
tokio-stream = "0.1.15"
tracing = "0.1"
[features]
rocksdb-backend = ["nomos-storage/rocksdb-backend"]
libp2p = ["nomos-network/libp2p"]

View File

@ -0,0 +1,45 @@
use blst::{min_sig::PublicKey, min_sig::SecretKey};
// std
// crates
use kzgrs_backend::{
common::Attestation,
verifier::{DaBlob, DaVerifier as NomosKzgrsVerifier},
};
use nomos_core::da::DaVerifier;
use overwatch_rs::DynError;
use super::VerifierBackend;
// internal
pub struct KzgrsDaVerifier {
verifier: NomosKzgrsVerifier,
}
impl VerifierBackend for KzgrsDaVerifier {
type Settings = KzgrsDaVerifierSettings;
fn new(settings: Self::Settings) -> Self {
let verifier = NomosKzgrsVerifier::new(settings.sk, &settings.nodes_public_keys);
Self { verifier }
}
}
impl DaVerifier for KzgrsDaVerifier {
type DaBlob = DaBlob;
type Attestation = Attestation;
type Error = DynError;
fn verify(&self, blob: &Self::DaBlob) -> Result<Self::Attestation, DynError> {
let blob = blob.clone();
match self.verifier.verify(blob) {
Some(attestation) => Ok(attestation),
None => Err("Failed to attest the blob".into()),
}
}
}
// TODO: `sk` and `nodes_public_keys` need to be fetched from the params provider service.
pub struct KzgrsDaVerifierSettings {
pub sk: SecretKey,
pub nodes_public_keys: Vec<PublicKey>,
}

View File

@ -0,0 +1,8 @@
pub mod kzgrs;
use nomos_core::da::DaVerifier;
pub trait VerifierBackend: DaVerifier {
type Settings;
fn new(settings: Self::Settings) -> Self;
}

View File

@ -0,0 +1,205 @@
mod backend;
mod network;
mod storage;
// std
use nomos_storage::StorageService;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use std::error::Error;
use std::fmt::{Debug, Formatter};
use storage::DaStorageAdapter;
// crates
use tokio_stream::StreamExt;
use tracing::error;
// internal
use backend::VerifierBackend;
use network::NetworkAdapter;
use nomos_network::NetworkService;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
pub enum DaVerifierMsg<B> {
AddBlob { blob: B },
}
impl<B: 'static> Debug for DaVerifierMsg<B> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DaVerifierMsg::AddBlob { .. } => {
write!(f, "DaVerifierMsg::AddBlob")
}
}
}
}
impl<B: 'static> RelayMessage for DaVerifierMsg<B> {}
pub struct DaVerifierService<Backend, N, S>
where
Backend: VerifierBackend,
Backend::Settings: Clone,
Backend::DaBlob: 'static,
Backend::Error: Error,
N: NetworkAdapter,
N::Settings: Clone,
S: DaStorageAdapter,
{
network_relay: Relay<NetworkService<N::Backend>>,
service_state: ServiceStateHandle<Self>,
storage_relay: Relay<StorageService<S::Backend>>,
verifier: Backend,
}
impl<Backend, N, S> DaVerifierService<Backend, N, S>
where
Backend: VerifierBackend + Send + 'static,
Backend::DaBlob: Debug + Send,
Backend::Attestation: Debug + Send,
Backend::Error: Error + Send + Sync + 'static,
Backend::Settings: Clone,
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation> + Send + 'static,
N::Settings: Clone,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation>
+ Send
+ 'static,
{
async fn handle_new_blob(
verifier: &Backend,
storage_adapter: &S,
blob: &Backend::DaBlob,
) -> Result<Backend::Attestation, DynError> {
if let Some(attestation) = storage_adapter.get_attestation(blob).await? {
Ok(attestation)
} else {
let attestation = verifier.verify(blob)?;
storage_adapter.add_blob(blob, &attestation).await?;
Ok(attestation)
}
}
async fn should_stop_service(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
}
impl<Backend, N, S> ServiceData for DaVerifierService<Backend, N, S>
where
Backend: VerifierBackend,
Backend::Settings: Clone,
Backend::Error: Error,
N: NetworkAdapter,
N::Settings: Clone,
S: DaStorageAdapter,
S::Settings: Clone,
{
const SERVICE_ID: ServiceId = "DaVerifier";
type Settings = DaVerifierServiceSettings<Backend::Settings, N::Settings, S::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = DaVerifierMsg<Backend::DaBlob>;
}
#[async_trait::async_trait]
impl<Backend, N, S> ServiceCore for DaVerifierService<Backend, N, S>
where
Backend: VerifierBackend + Send + Sync + 'static,
Backend::Settings: Clone + Send + Sync + 'static,
Backend::DaBlob: Debug + Send + Sync + 'static,
Backend::Attestation: Debug + Send,
Backend::Error: Error + Send + Sync,
N: NetworkAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation> + Send + 'static,
N::Settings: Clone + Send + Sync + 'static,
S: DaStorageAdapter<Blob = Backend::DaBlob, Attestation = Backend::Attestation>
+ Send
+ Sync
+ 'static,
S::Settings: Clone + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
let DaVerifierServiceSettings {
verifier_settings, ..
} = service_state.settings_reader.get_updated_settings();
let network_relay = service_state.overwatch_handle.relay();
let storage_relay = service_state.overwatch_handle.relay();
Ok(Self {
network_relay,
storage_relay,
service_state,
verifier: Backend::new(verifier_settings),
})
}
async fn run(self) -> Result<(), DynError> {
// This service will likely have to be modified later on.
// Most probably the verifier itself need to be constructed/update for every message with
// an updated list of the available nodes list, as it needs his own index coming from the
// position of his bls public key landing in the above-mentioned list.
let Self {
network_relay,
storage_relay,
mut service_state,
verifier,
} = self;
let DaVerifierServiceSettings {
network_adapter_settings,
storage_adapter_settings,
..
} = service_state.settings_reader.get_updated_settings();
let network_relay = network_relay.connect().await?;
let network_adapter = N::new(network_adapter_settings, network_relay).await;
let mut blob_stream = network_adapter.blob_stream().await;
let storage_relay = storage_relay.connect().await?;
let storage_adapter = S::new(storage_adapter_settings, storage_relay).await;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(blob) = blob_stream.next() => {
match Self::handle_new_blob(&verifier,&storage_adapter, &blob).await {
Ok(attestation) => if let Err(e) = network_adapter.send_attestation(attestation).await {
error!("Error replying attestation {e:?}");
},
Err(err) => error!("Error handling blob {blob:?} due to {err:?}"),
}
}
Some(msg) = service_state.inbound_relay.recv() => {
let DaVerifierMsg::AddBlob { blob } = msg;
if let Err(err) = Self::handle_new_blob(&verifier, &storage_adapter, &blob).await {
error!("Error handling blob {blob:?} due to {err:?}");
}
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
break;
}
}
}
}
Ok(())
}
}
#[derive(Clone)]
pub struct DaVerifierServiceSettings<BackendSettings, NetworkSettings, StorageSettings> {
verifier_settings: BackendSettings,
network_adapter_settings: NetworkSettings,
storage_adapter_settings: StorageSettings,
}

View File

@ -0,0 +1,107 @@
// std
use futures::Stream;
use overwatch_rs::DynError;
use std::marker::PhantomData;
// crates
// internal
use crate::network::NetworkAdapter;
use nomos_core::wire;
use nomos_network::backends::libp2p::{Command, Event, EventKind, Libp2p, Message, TopicHash};
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use tracing::debug;
pub const NOMOS_DA_TOPIC: &str = "NomosDa";
pub struct Libp2pAdapter<B, A> {
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
_blob: PhantomData<B>,
_attestation: PhantomData<A>,
}
impl<B, A> Libp2pAdapter<B, A>
where
B: Serialize + DeserializeOwned + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Send + Sync + 'static,
{
async fn stream_for<E: DeserializeOwned>(&self) -> Box<dyn Stream<Item = E> + Unpin + Send> {
let topic_hash = TopicHash::from_raw(NOMOS_DA_TOPIC);
let (sender, receiver) = tokio::sync::oneshot::channel();
self.network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
.expect("Network backend should be ready");
let receiver = receiver.await.unwrap();
Box::new(Box::pin(BroadcastStream::new(receiver).filter_map(
move |msg| match msg {
Ok(Event::Message(Message { topic, data, .. })) if topic == topic_hash => {
match wire::deserialize::<E>(&data) {
Ok(msg) => Some(msg),
Err(e) => {
debug!("Unrecognized message: {e}");
None
}
}
}
_ => None,
},
)))
}
async fn send<E: Serialize>(&self, data: E) -> Result<(), DynError> {
let message = wire::serialize(&data)?.into_boxed_slice();
self.network_relay
.send(NetworkMsg::Process(Command::Broadcast {
topic: NOMOS_DA_TOPIC.to_string(),
message,
}))
.await
.map_err(|(e, _)| Box::new(e) as DynError)
}
}
#[async_trait::async_trait]
impl<B, A> NetworkAdapter for Libp2pAdapter<B, A>
where
B: Serialize + DeserializeOwned + Send + Sync + 'static,
A: Serialize + DeserializeOwned + Send + Sync + 'static,
{
type Backend = Libp2p;
type Settings = ();
type Blob = B;
type Attestation = A;
async fn new(
_settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self {
network_relay
.send(NetworkMsg::Process(Command::Subscribe(
NOMOS_DA_TOPIC.to_string(),
)))
.await
.expect("Network backend should be ready");
Self {
network_relay,
_blob: Default::default(),
_attestation: Default::default(),
}
}
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send> {
self.stream_for::<Self::Blob>().await
}
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError> {
self.send(attestation).await
}
}

View File

@ -0,0 +1,2 @@
#[cfg(feature = "libp2p")]
pub mod libp2p;

View File

@ -0,0 +1,25 @@
pub mod adapters;
use futures::Stream;
use nomos_network::backends::NetworkBackend;
use nomos_network::NetworkService;
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
#[async_trait::async_trait]
pub trait NetworkAdapter {
type Backend: NetworkBackend + 'static;
type Settings;
type Blob;
type Attestation;
async fn new(
settings: Self::Settings,
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn blob_stream(&self) -> Box<dyn Stream<Item = Self::Blob> + Unpin + Send>;
async fn send_attestation(&self, attestation: Self::Attestation) -> Result<(), DynError>;
}

View File

@ -0,0 +1,2 @@
#[cfg(feature = "rocksdb-backend")]
pub mod rocksdb;

View File

@ -0,0 +1,111 @@
// std
use serde::{de::DeserializeOwned, Serialize};
use std::{marker::PhantomData, path::PathBuf};
// crates
use nomos_core::da::{attestation::Attestation, blob::Blob};
use nomos_da_storage::{
fs::write_blob,
rocksdb::{key_bytes, DA_ATTESTED_KEY_PREFIX},
};
use nomos_storage::{
backends::{rocksdb::RocksBackend, StorageSerde},
StorageMsg, StorageService,
};
use overwatch_rs::{
services::{relay::OutboundRelay, ServiceData},
DynError,
};
// internal
use crate::storage::DaStorageAdapter;
pub struct RocksAdapter<A, B, S>
where
S: StorageSerde + Send + Sync + 'static,
{
settings: RocksAdapterSettings,
storage_relay: OutboundRelay<StorageMsg<RocksBackend<S>>>,
_blob: PhantomData<B>,
_attestation: PhantomData<A>,
}
#[async_trait::async_trait]
impl<A, B, S> DaStorageAdapter for RocksAdapter<A, B, S>
where
A: Attestation + Serialize + DeserializeOwned + Clone + Send + Sync,
B: Blob + AsRef<[u8]> + Clone + Send + Sync + 'static,
B::BlobId: AsRef<[u8]> + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,
{
type Backend = RocksBackend<S>;
type Blob = B;
type Attestation = A;
type Settings = RocksAdapterSettings;
async fn new(
settings: Self::Settings,
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
) -> Self {
Self {
settings,
storage_relay,
_blob: PhantomData,
_attestation: PhantomData,
}
}
async fn add_blob(
&self,
blob: &Self::Blob,
attestation: &Self::Attestation,
) -> Result<(), DynError> {
write_blob(
self.settings.blob_storage_directory.clone(),
blob.id().as_ref(),
blob.as_ref(),
)
.await?;
// Mark blob as attested for lateer use in Indexer and attestation cache.
let blob_key = key_bytes(DA_ATTESTED_KEY_PREFIX, blob.id().as_ref());
self.storage_relay
.send(StorageMsg::Store {
key: blob_key,
value: S::serialize(attestation),
})
.await
.map_err(|(e, _)| e.into())
}
async fn get_attestation(
&self,
blob: &Self::Blob,
) -> Result<Option<Self::Attestation>, DynError> {
let attestation_key = key_bytes(DA_ATTESTED_KEY_PREFIX, blob.id().as_ref());
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.storage_relay
.send(StorageMsg::Load {
key: attestation_key,
reply_channel: reply_tx,
})
.await
.expect("Failed to send load request to storage relay");
// TODO: Use storage backend ser/de functionality.
//
// Storage backend already handles ser/de, but lacks the ability to seperate storage
// domains using prefixed keys. Once implemented Indexer and Verifier can be simplified.
reply_rx
.await
.map(|maybe_bytes| {
maybe_bytes.map(|bytes| {
S::deserialize(bytes).expect("Attestation should be deserialized from bytes")
})
})
.map_err(|_| "".into())
}
}
#[derive(Debug, Clone)]
pub struct RocksAdapterSettings {
pub blob_storage_directory: PathBuf,
}

View File

@ -0,0 +1,30 @@
pub mod adapters;
use nomos_storage::{backends::StorageBackend, StorageService};
use overwatch_rs::{
services::{relay::OutboundRelay, ServiceData},
DynError,
};
#[async_trait::async_trait]
pub trait DaStorageAdapter {
type Backend: StorageBackend + Send + Sync + 'static;
type Settings: Clone;
type Blob: Clone;
type Attestation: Clone;
async fn new(
settings: Self::Settings,
storage_relay: OutboundRelay<<StorageService<Self::Backend> as ServiceData>::Message>,
) -> Self;
async fn add_blob(
&self,
blob: &Self::Blob,
attestation: &Self::Attestation,
) -> Result<(), DynError>;
async fn get_attestation(
&self,
blob: &Self::Blob,
) -> Result<Option<Self::Attestation>, DynError>;
}