Save safe blocks contents to storage (#464)

This commit is contained in:
Giacomo Pasini 2023-10-17 11:04:47 +02:00 committed by GitHub
parent b997011c6a
commit f90704f1d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 12 deletions

View File

@ -24,6 +24,7 @@ nomos-log = { path = "../../nomos-services/log" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] }
nomos-http = { path = "../../nomos-services/http", features = ["http"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] }
nomos-consensus = { path = "../../nomos-services/consensus", features = ["libp2p"] } nomos-consensus = { path = "../../nomos-services/consensus", features = ["libp2p"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
nomos-libp2p = { path = "../../nomos-libp2p" } nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-da = { path = "../../nomos-services/data-availability" } nomos-da = { path = "../../nomos-services/data-availability" }
metrics = { path = "../../nomos-services/metrics", optional = true } metrics = { path = "../../nomos-services/metrics", optional = true }

View File

@ -9,10 +9,12 @@ use full_replication::{AbsoluteNumber, Attestation, Blob, FullReplication};
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter;
use bytes::Bytes;
use nomos_consensus::CarnotConsensus; use nomos_consensus::CarnotConsensus;
use nomos_core::{ use nomos_core::{
da::{blob, certificate}, da::{blob, certificate},
tx::Transaction, tx::Transaction,
wire,
}; };
use nomos_da::{ use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
@ -28,6 +30,10 @@ use nomos_mempool::{
Transaction as TxDiscriminant, Transaction as TxDiscriminant,
}; };
use nomos_network::backends::libp2p::Libp2p; use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
StorageService,
};
use nomos_network::NetworkService; use nomos_network::NetworkService;
use overwatch_derive::*; use overwatch_derive::*;
@ -38,6 +44,7 @@ use nomos_core::{
da::certificate::select::FillSize as FillSizeWithBlobsCertificate, da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
tx::select::FillSize as FillSizeWithTx, tx::select::FillSize as FillSizeWithTx,
}; };
use serde::{de::DeserializeOwned, Serialize};
pub use tx::Tx; pub use tx::Tx;
pub const CL_TOPIC: &str = "cl"; pub const CL_TOPIC: &str = "cl";
@ -56,6 +63,7 @@ pub type Carnot = CarnotConsensus<
TreeOverlay<RoundRobin, RandomBeaconState>, TreeOverlay<RoundRobin, RandomBeaconState>,
FillSizeWithTx<MB16, Tx>, FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, Certificate>, FillSizeWithBlobsCertificate<MB16, Certificate>,
SledBackend<Wire>,
>; >;
type DataAvailability = DataAvailabilityService< type DataAvailability = DataAvailabilityService<
@ -84,4 +92,19 @@ pub struct Nomos {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>, metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
da: ServiceHandle<DataAvailability>, da: ServiceHandle<DataAvailability>,
storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
}
pub struct Wire;
impl StorageSerde for Wire {
type Error = wire::Error;
fn serialize<T: Serialize>(value: T) -> Bytes {
wire::serialize(&value).unwrap().into()
}
fn deserialize<T: DeserializeOwned>(buff: Bytes) -> Result<T, Self::Error> {
wire::deserialize(&buff)
}
} }

View File

@ -18,6 +18,8 @@ use nomos_network::backends::libp2p::Libp2p;
use overwatch_rs::overwatch::*; use overwatch_rs::overwatch::*;
use std::sync::Arc; use std::sync::Arc;
const DEFAULT_DB_PATH: &str = "./db";
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Args {
@ -99,6 +101,9 @@ fn main() -> Result<()> {
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics: config.metrics, metrics: config.metrics,
da: config.da, da: config.da,
storage: nomos_storage::backends::sled::SledBackendSettings {
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
},
}, },
None, None,
) )

View File

@ -15,6 +15,7 @@ nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" } nomos-mempool = { path = "../mempool" }
nomos-core = { path = "../../nomos-core" } nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" }
nomos-storage = { path = "../storage" }
rand_chacha = "0.3" rand_chacha = "0.3"
rand = "0.8" rand = "0.8"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }

View File

@ -45,6 +45,7 @@ use nomos_mempool::{
MempoolMsg, MempoolService, Transaction as TxDiscriminant, MempoolMsg, MempoolService, Transaction as TxDiscriminant,
}; };
use nomos_network::NetworkService; use nomos_network::NetworkService;
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
use overwatch_rs::services::{ use overwatch_rs::services::{
handle::ServiceStateHandle, handle::ServiceStateHandle,
@ -104,7 +105,7 @@ impl<O: Overlay, Ts, Bs> CarnotSettings<O, Ts, Bs> {
} }
} }
pub struct CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> pub struct CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where where
A: NetworkAdapter, A: NetworkAdapter,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>, ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
@ -119,6 +120,7 @@ where
A::Backend: 'static, A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>, TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>, BS: BlobCertificateSelect<Certificate = DaPool::Item>,
Storage: StorageBackend + Send + Sync + 'static,
{ {
service_state: ServiceStateHandle<Self>, service_state: ServiceStateHandle<Self>,
// underlying networking backend. We need this so we can relay and check the types properly // underlying networking backend. We need this so we can relay and check the types properly
@ -126,11 +128,12 @@ where
network_relay: Relay<NetworkService<A::Backend>>, network_relay: Relay<NetworkService<A::Backend>>,
cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>, cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>,
da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>, da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>,
storage_relay: Relay<StorageService<Storage>>,
_overlay: std::marker::PhantomData<O>, _overlay: std::marker::PhantomData<O>,
} }
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> ServiceData impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage> ServiceData
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where where
A: NetworkAdapter, A: NetworkAdapter,
ClPool: MemPool, ClPool: MemPool,
@ -144,6 +147,7 @@ where
O: Overlay + Debug, O: Overlay + Debug,
TxS: TxSelect<Tx = ClPool::Item>, TxS: TxSelect<Tx = ClPool::Item>,
BS: BlobCertificateSelect<Certificate = DaPool::Item>, BS: BlobCertificateSelect<Certificate = DaPool::Item>,
Storage: StorageBackend + Send + Sync + 'static,
{ {
const SERVICE_ID: ServiceId = "Carnot"; const SERVICE_ID: ServiceId = "Carnot";
type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>; type Settings = CarnotSettings<O, TxS::Settings, BS::Settings>;
@ -153,8 +157,8 @@ where
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> ServiceCore impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage> ServiceCore
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where where
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static,
@ -192,17 +196,20 @@ where
TxS::Settings: Send + Sync + 'static, TxS::Settings: Send + Sync + 'static,
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static, BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
BS::Settings: Send + Sync + 'static, BS::Settings: Send + Sync + 'static,
Storage: StorageBackend + Send + Sync + 'static,
{ {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> { fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay(); let network_relay = service_state.overwatch_handle.relay();
let cl_mempool_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay();
let da_mempool_relay = service_state.overwatch_handle.relay(); let da_mempool_relay = service_state.overwatch_handle.relay();
let storage_relay = service_state.overwatch_handle.relay();
Ok(Self { Ok(Self {
service_state, service_state,
network_relay, network_relay,
_overlay: Default::default(), _overlay: Default::default(),
cl_mempool_relay, cl_mempool_relay,
da_mempool_relay, da_mempool_relay,
storage_relay,
}) })
} }
@ -225,6 +232,12 @@ where
.await .await
.expect("Relay connection with MemPoolService should succeed"); .expect("Relay connection with MemPoolService should succeed");
let storage_relay: OutboundRelay<_> = self
.storage_relay
.connect()
.await
.expect("Relay connection with StorageService should succeed");
let CarnotSettings { let CarnotSettings {
private_key, private_key,
overlay_settings, overlay_settings,
@ -310,6 +323,7 @@ where
private_key, private_key,
cl_mempool_relay.clone(), cl_mempool_relay.clone(),
da_mempool_relay.clone(), da_mempool_relay.clone(),
storage_relay.clone(),
tx_selector.clone(), tx_selector.clone(),
blob_selector.clone(), blob_selector.clone(),
timeout, timeout,
@ -336,8 +350,8 @@ enum Output<Tx: Clone + Eq + Hash, BlobCertificate: Clone + Eq + Hash> {
}, },
} }
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS> CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where where
A: NetworkAdapter + Clone + Send + Sync + 'static, A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static,
@ -373,6 +387,7 @@ where
DaPool::Key: Debug + Send + Sync, DaPool::Key: Debug + Send + Sync,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static, ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static, DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
Storage: StorageBackend + Send + Sync + 'static,
{ {
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) { fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
match msg { match msg {
@ -402,6 +417,7 @@ where
private_key: PrivateKey, private_key: PrivateKey,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>, cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>, da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
tx_selector: TxS, tx_selector: TxS,
blobl_selector: BS, blobl_selector: BS,
timeout: Duration, timeout: Duration,
@ -410,8 +426,15 @@ where
let prev_view = carnot.current_view(); let prev_view = carnot.current_view();
match event { match event {
Event::Proposal { block, stream } => { Event::Proposal { block, stream } => {
(carnot, output) = (carnot, output) = Self::process_block(
Self::process_block(carnot, block, stream, task_manager, adapter.clone()).await; carnot,
block,
stream,
task_manager,
adapter.clone(),
storage_relay,
)
.await;
} }
Event::Approve { block, .. } => { Event::Approve { block, .. } => {
tracing::debug!("approving proposal {:?}", block); tracing::debug!("approving proposal {:?}", block);
@ -486,13 +509,14 @@ where
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
#[instrument(level = "debug", skip(adapter, task_manager, stream))] #[instrument(level = "debug", skip(adapter, task_manager, stream, storage_relay))]
async fn process_block( async fn process_block(
mut carnot: Carnot<O>, mut carnot: Carnot<O>,
block: Block<ClPool::Item, DaPool::Item>, block: Block<ClPool::Item, DaPool::Item>,
mut stream: Pin<Box<dyn Stream<Item = Block<ClPool::Item, DaPool::Item>> + Send>>, mut stream: Pin<Box<dyn Stream<Item = Block<ClPool::Item, DaPool::Item>> + Send>>,
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>, task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A, adapter: A,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) { ) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view { if carnot.highest_voted_view() >= block.header().view {
@ -502,6 +526,7 @@ where
let original_block = block; let original_block = block;
let block = original_block.header().clone(); let block = original_block.header().clone();
let self_committee = carnot.self_committee(); let self_committee = carnot.self_committee();
let leader_committee = [carnot.id()].into_iter().collect(); let leader_committee = [carnot.id()].into_iter().collect();
@ -518,6 +543,10 @@ where
match carnot.receive_block(block.clone()) { match carnot.receive_block(block.clone()) {
Ok(mut new_state) => { Ok(mut new_state) => {
let new_view = new_state.current_view(); let new_view = new_state.current_view();
let msg = <StorageMsg<_>>::new_store_message(block.id, original_block.clone());
if let Err((e, _msg)) = storage_relay.send(msg).await {
tracing::error!("Could not send block to storage: {e}");
}
if new_view != carnot.current_view() { if new_view != carnot.current_view() {
task_manager.push( task_manager.push(
block.view, block.view,

View File

@ -19,10 +19,10 @@ pub enum Error {
} }
/// Sled backend setting /// Sled backend setting
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct SledBackendSettings { pub struct SledBackendSettings {
/// File path to the db file /// File path to the db file
db_path: PathBuf, pub db_path: PathBuf,
} }
/// Sled transaction type /// Sled transaction type