diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index ad83cb74..07aa7bdf 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -24,6 +24,7 @@ nomos-log = { path = "../../nomos-services/log" } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p"] } nomos-http = { path = "../../nomos-services/http", features = ["http"] } nomos-consensus = { path = "../../nomos-services/consensus", features = ["libp2p"] } +nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-da = { path = "../../nomos-services/data-availability" } metrics = { path = "../../nomos-services/metrics", optional = true } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index b5964ca3..50fb100b 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -9,10 +9,12 @@ use full_replication::{AbsoluteNumber, Attestation, Blob, FullReplication}; use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter; +use bytes::Bytes; use nomos_consensus::CarnotConsensus; use nomos_core::{ da::{blob, certificate}, tx::Transaction, + wire, }; use nomos_da::{ backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter, @@ -28,6 +30,10 @@ use nomos_mempool::{ Transaction as TxDiscriminant, }; use nomos_network::backends::libp2p::Libp2p; +use nomos_storage::{ + backends::{sled::SledBackend, StorageSerde}, + StorageService, +}; use nomos_network::NetworkService; use overwatch_derive::*; @@ -38,6 +44,7 @@ use nomos_core::{ da::certificate::select::FillSize as FillSizeWithBlobsCertificate, tx::select::FillSize as FillSizeWithTx, }; +use serde::{de::DeserializeOwned, Serialize}; pub use tx::Tx; pub const CL_TOPIC: &str = "cl"; @@ -56,6 +63,7 @@ pub type Carnot = CarnotConsensus< TreeOverlay, FillSizeWithTx, FillSizeWithBlobsCertificate, + SledBackend, >; type DataAvailability = DataAvailabilityService< @@ -84,4 +92,19 @@ pub struct Nomos { #[cfg(feature = "metrics")] metrics: ServiceHandle>>, da: ServiceHandle, + storage: ServiceHandle>>, +} + +pub struct Wire; + +impl StorageSerde for Wire { + type Error = wire::Error; + + fn serialize(value: T) -> Bytes { + wire::serialize(&value).unwrap().into() + } + + fn deserialize(buff: Bytes) -> Result { + wire::deserialize(&buff) + } } diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index e2af5560..65d76e30 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -18,6 +18,8 @@ use nomos_network::backends::libp2p::Libp2p; use overwatch_rs::overwatch::*; use std::sync::Arc; +const DEFAULT_DB_PATH: &str = "./db"; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -99,6 +101,9 @@ fn main() -> Result<()> { #[cfg(feature = "metrics")] metrics: config.metrics, da: config.da, + storage: nomos_storage::backends::sled::SledBackendSettings { + db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), + }, }, None, ) diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index 56a17fc0..c9246aef 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -15,6 +15,7 @@ nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } nomos-core = { path = "../../nomos-core" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" } +nomos-storage = { path = "../storage" } rand_chacha = "0.3" rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 5787df2f..6d5b26d6 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -45,6 +45,7 @@ use nomos_mempool::{ MempoolMsg, MempoolService, Transaction as TxDiscriminant, }; use nomos_network::NetworkService; +use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService}; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -104,7 +105,7 @@ impl CarnotSettings { } } -pub struct CarnotConsensus +pub struct CarnotConsensus where A: NetworkAdapter, ClPoolAdapter: MempoolAdapter, @@ -119,6 +120,7 @@ where A::Backend: 'static, TxS: TxSelect, BS: BlobCertificateSelect, + Storage: StorageBackend + Send + Sync + 'static, { service_state: ServiceStateHandle, // underlying networking backend. We need this so we can relay and check the types properly @@ -126,11 +128,12 @@ where network_relay: Relay>, cl_mempool_relay: Relay>, da_mempool_relay: Relay>, + storage_relay: Relay>, _overlay: std::marker::PhantomData, } -impl ServiceData - for CarnotConsensus +impl ServiceData + for CarnotConsensus where A: NetworkAdapter, ClPool: MemPool, @@ -144,6 +147,7 @@ where O: Overlay + Debug, TxS: TxSelect, BS: BlobCertificateSelect, + Storage: StorageBackend + Send + Sync + 'static, { const SERVICE_ID: ServiceId = "Carnot"; type Settings = CarnotSettings; @@ -153,8 +157,8 @@ where } #[async_trait::async_trait] -impl ServiceCore - for CarnotConsensus +impl ServiceCore + for CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static, @@ -192,17 +196,20 @@ where TxS::Settings: Send + Sync + 'static, BS: BlobCertificateSelect + Clone + Send + Sync + 'static, BS::Settings: Send + Sync + 'static, + Storage: StorageBackend + Send + Sync + 'static, { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay(); let da_mempool_relay = service_state.overwatch_handle.relay(); + let storage_relay = service_state.overwatch_handle.relay(); Ok(Self { service_state, network_relay, _overlay: Default::default(), cl_mempool_relay, da_mempool_relay, + storage_relay, }) } @@ -225,6 +232,12 @@ where .await .expect("Relay connection with MemPoolService should succeed"); + let storage_relay: OutboundRelay<_> = self + .storage_relay + .connect() + .await + .expect("Relay connection with StorageService should succeed"); + let CarnotSettings { private_key, overlay_settings, @@ -310,6 +323,7 @@ where private_key, cl_mempool_relay.clone(), da_mempool_relay.clone(), + storage_relay.clone(), tx_selector.clone(), blob_selector.clone(), timeout, @@ -336,8 +350,8 @@ enum Output { }, } -impl - CarnotConsensus +impl + CarnotConsensus where A: NetworkAdapter + Clone + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static, @@ -373,6 +387,7 @@ where DaPool::Key: Debug + Send + Sync, ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, + Storage: StorageBackend + Send + Sync + 'static, { fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { @@ -402,6 +417,7 @@ where private_key: PrivateKey, cl_mempool_relay: OutboundRelay>, da_mempool_relay: OutboundRelay>, + storage_relay: OutboundRelay>, tx_selector: TxS, blobl_selector: BS, timeout: Duration, @@ -410,8 +426,15 @@ where let prev_view = carnot.current_view(); match event { Event::Proposal { block, stream } => { - (carnot, output) = - Self::process_block(carnot, block, stream, task_manager, adapter.clone()).await; + (carnot, output) = Self::process_block( + carnot, + block, + stream, + task_manager, + adapter.clone(), + storage_relay, + ) + .await; } Event::Approve { block, .. } => { tracing::debug!("approving proposal {:?}", block); @@ -486,13 +509,14 @@ where } #[allow(clippy::type_complexity)] - #[instrument(level = "debug", skip(adapter, task_manager, stream))] + #[instrument(level = "debug", skip(adapter, task_manager, stream, storage_relay))] async fn process_block( mut carnot: Carnot, block: Block, mut stream: Pin> + Send>>, task_manager: &mut TaskManager>, adapter: A, + storage_relay: OutboundRelay>, ) -> (Carnot, Option>) { tracing::debug!("received proposal {:?}", block); if carnot.highest_voted_view() >= block.header().view { @@ -502,6 +526,7 @@ where let original_block = block; let block = original_block.header().clone(); + let self_committee = carnot.self_committee(); let leader_committee = [carnot.id()].into_iter().collect(); @@ -518,6 +543,10 @@ where match carnot.receive_block(block.clone()) { Ok(mut new_state) => { let new_view = new_state.current_view(); + let msg = >::new_store_message(block.id, original_block.clone()); + if let Err((e, _msg)) = storage_relay.send(msg).await { + tracing::error!("Could not send block to storage: {e}"); + } if new_view != carnot.current_view() { task_manager.push( block.view, diff --git a/nomos-services/storage/src/backends/sled.rs b/nomos-services/storage/src/backends/sled.rs index 4bfeee57..0afba591 100644 --- a/nomos-services/storage/src/backends/sled.rs +++ b/nomos-services/storage/src/backends/sled.rs @@ -19,10 +19,10 @@ pub enum Error { } /// Sled backend setting -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct SledBackendSettings { /// File path to the db file - db_path: PathBuf, + pub db_path: PathBuf, } /// Sled transaction type