From 75b36020c2da73b9938ee7d1d8313c2ed622a2f4 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Wed, 25 Oct 2023 12:10:21 +0200 Subject: [PATCH] Lifecycle update and implementations (#457) * Update deps * Implement base lifecycle handling in network service * Implement base lifecycle handling in storage service * Use methods instead of functions * Pipe lifecycle in metrics service * Pipe lifecycle in mempool service * Pipe lifecycle in log service * Pipe lifecycle in da service * Pipe lifecycle in consensus service * Refactor handling of lifecycle message to should_stop_service * Update overwatch version to fixed run_all one --- nodes/mixnode/Cargo.toml | 4 +- nodes/nomos-node/Cargo.toml | 4 +- nomos-cli/Cargo.toml | 4 +- nomos-http-api/Cargo.toml | 4 +- nomos-services/consensus/Cargo.toml | 2 +- nomos-services/consensus/src/lib.rs | 26 ++- nomos-services/data-availability/Cargo.toml | 2 +- nomos-services/data-availability/src/lib.rs | 131 +++++++++------ nomos-services/http/Cargo.toml | 4 +- nomos-services/log/Cargo.toml | 2 +- nomos-services/log/src/lib.rs | 47 +++++- nomos-services/mempool/Cargo.toml | 4 +- nomos-services/mempool/src/lib.rs | 167 +++++++++++++------- nomos-services/metrics/Cargo.toml | 6 +- nomos-services/metrics/src/lib.rs | 80 +++++++--- nomos-services/network/Cargo.toml | 2 +- nomos-services/network/src/lib.rs | 77 ++++++--- nomos-services/storage/Cargo.toml | 3 +- nomos-services/storage/src/lib.rs | 68 ++++++-- tests/Cargo.toml | 2 +- 20 files changed, 450 insertions(+), 189 deletions(-) diff --git a/nodes/mixnode/Cargo.toml b/nodes/mixnode/Cargo.toml index 37ab350e..e6e5b1cb 100644 --- a/nodes/mixnode/Cargo.toml +++ b/nodes/mixnode/Cargo.toml @@ -9,8 +9,8 @@ mixnet-node = { path = "../../mixnet/node" } nomos-log = { path = "../../nomos-services/log" } clap = { version = "4", features = ["derive"] } color-eyre = "0.6.0" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } serde = "1" serde_yaml = "0.9" tracing = "0.1" diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 11da550c..5e028be2 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -14,8 +14,8 @@ chrono = "0.4" futures = "0.3" http = "0.2.9" hex = "0.4.3" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" multiaddr = "0.18" nomos-core = { path = "../../nomos-core" } diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index f89df15a..2fb2b4da 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -15,8 +15,8 @@ clap = {version = "4", features = ["derive"] } serde_yaml = "0.9" futures = "0.3" tokio = { version = "1", features = ["sync"] } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] } nomos-libp2p = { path = "../nomos-libp2p"} diff --git a/nomos-http-api/Cargo.toml b/nomos-http-api/Cargo.toml index 88ef6ab7..03e31bd6 100644 --- a/nomos-http-api/Cargo.toml +++ b/nomos-http-api/Cargo.toml @@ -8,8 +8,8 @@ axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"] [dependencies] async-trait = "0.1" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tracing = "0.1" utoipa = "4.0" utoipa-swagger-ui = { version = "4.0" } diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index c9246aef..0c564def 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -14,7 +14,7 @@ futures = "0.3" nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } nomos-core = { path = "../../nomos-core" } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } nomos-storage = { path = "../storage" } rand_chacha = "0.3" rand = "0.8" diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 6d5b26d6..5cd8bb19 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -19,7 +19,7 @@ use serde::Deserialize; use serde::{de::DeserializeOwned, Serialize}; use serde_with::serde_as; use tokio::sync::oneshot::Sender; -use tracing::instrument; +use tracing::{error, instrument}; // internal use crate::network::messages::{ NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, @@ -46,6 +46,7 @@ use nomos_mempool::{ }; use nomos_network::NetworkService; use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService}; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -311,7 +312,7 @@ where Event::ProposeBlock { qc } }); } - + let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream(); loop { tokio::select! { Some(event) = task_manager.next() => { @@ -333,8 +334,14 @@ where Some(msg) = self.service_state.inbound_relay.next() => { Self::process_message(&carnot, msg); } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } } } + Ok(()) } } @@ -389,6 +396,21 @@ where DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, Storage: StorageBackend + Send + Sync + 'static, { + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { ConsensusMsg::Info { tx } => { diff --git a/nomos-services/data-availability/Cargo.toml b/nomos-services/data-availability/Cargo.toml index 0b99aa02..d1cfcadd 100644 --- a/nomos-services/data-availability/Cargo.toml +++ b/nomos-services/data-availability/Cargo.toml @@ -11,7 +11,7 @@ futures = "0.3" moka = { version = "0.11", features = ["future"] } nomos-core = { path = "../../nomos-core" } nomos-network = { path = "../network" } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = "1.0" tracing = "0.1" tokio = { version = "1", features = ["sync", "macros"] } diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index b0ebc255..edffb8fb 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -14,9 +14,11 @@ use crate::network::NetworkAdapter; use nomos_core::da::{blob::Blob, DaProtocol}; use nomos_network::NetworkService; use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use tracing::error; pub struct DataAvailabilityService where @@ -70,6 +72,76 @@ where type Message = DaMsg; } +impl DataAvailabilityService +where + Protocol: DaProtocol + Send + Sync, + Backend: DaBackend + Send + Sync, + Protocol::Settings: Clone + Send + Sync + 'static, + Protocol::Blob: 'static, + Backend::Settings: Clone + Send + Sync + 'static, + Protocol::Blob: Send, + Protocol::Attestation: Send, + ::Hash: Debug + Send + Sync, + Network: + NetworkAdapter + Send + Sync, +{ + async fn handle_new_blob( + da: &Protocol, + backend: &Backend, + adapter: &Network, + blob: Protocol::Blob, + ) -> Result<(), DaError> { + // we need to handle the reply (verification + signature) + let attestation = da.attest(&blob); + backend.add_blob(blob).await?; + // we do not call `da.recv_blob` here because that is meant to + // be called to retrieve the original data, while here we're only interested + // in storing the blob. + // We might want to refactor the backend to be part of implementations of the + // Da protocol instead of this service and clear this confusion. + adapter + .send_attestation(attestation) + .await + .map_err(DaError::Dyn) + } + + async fn handle_da_msg(backend: &Backend, msg: DaMsg) -> Result<(), DaError> { + match msg { + DaMsg::RemoveBlobs { blobs } => { + futures::stream::iter(blobs) + .for_each_concurrent(None, |blob| async move { + if let Err(e) = backend.remove_blob(&blob).await { + tracing::debug!("Could not remove blob {blob:?} due to: {e:?}"); + } + }) + .await; + } + DaMsg::Get { ids, reply_channel } => { + let res = ids.filter_map(|id| backend.get_blob(&id)).collect(); + if reply_channel.send(res).is_err() { + tracing::error!("Could not returns blobs"); + } + } + } + Ok(()) + } + + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } +} + #[async_trait::async_trait] impl ServiceCore for DataAvailabilityService where @@ -111,71 +183,30 @@ where let adapter = Network::new(network_relay).await; let mut network_blobs = adapter.blob_stream().await; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); loop { tokio::select! { Some(blob) = network_blobs.next() => { - if let Err(e) = handle_new_blob(&da, &backend, &adapter, blob).await { + if let Err(e) = Self::handle_new_blob(&da, &backend, &adapter, blob).await { tracing::debug!("Failed to add a new received blob: {e:?}"); } } Some(msg) = service_state.inbound_relay.recv() => { - if let Err(e) = handle_da_msg(&backend, msg).await { + if let Err(e) = Self::handle_da_msg(&backend, msg).await { tracing::debug!("Failed to handle da msg: {e:?}"); } } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } } } + Ok(()) } } -async fn handle_new_blob< - Protocol: DaProtocol, - Backend: DaBackend, - A: NetworkAdapter, ->( - da: &Protocol, - backend: &Backend, - adapter: &A, - blob: Protocol::Blob, -) -> Result<(), DaError> { - // we need to handle the reply (verification + signature) - let attestation = da.attest(&blob); - backend.add_blob(blob).await?; - // we do not call `da.recv_blob` here because that is meant to - // be called to retrieve the original data, while here we're only interested - // in storing the blob. - // We might want to refactor the backend to be part of implementations of the - // Da protocol instead of this service and clear this confusion. - adapter - .send_attestation(attestation) - .await - .map_err(DaError::Dyn) -} - -async fn handle_da_msg(backend: &B, msg: DaMsg) -> Result<(), DaError> -where - ::Hash: Debug, -{ - match msg { - DaMsg::RemoveBlobs { blobs } => { - futures::stream::iter(blobs) - .for_each_concurrent(None, |blob| async move { - if let Err(e) = backend.remove_blob(&blob).await { - tracing::debug!("Could not remove blob {blob:?} due to: {e:?}"); - } - }) - .await; - } - DaMsg::Get { ids, reply_channel } => { - let res = ids.filter_map(|id| backend.get_blob(&id)).collect(); - if reply_channel.send(res).is_err() { - tracing::error!("Could not returns blobs"); - } - } - } - Ok(()) -} - #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct Settings { pub da_protocol: P, diff --git a/nomos-services/http/Cargo.toml b/nomos-services/http/Cargo.toml index 1bd90774..fc1dec05 100644 --- a/nomos-services/http/Cargo.toml +++ b/nomos-services/http/Cargo.toml @@ -25,8 +25,8 @@ clap = { version = "4", features = ["derive", "env"], optional = true } futures = "0.3" http = "0.2.9" hyper = { version = "0.14", optional = true } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } parking_lot = { version = "0.12", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", optional = true } diff --git a/nomos-services/log/Cargo.toml b/nomos-services/log/Cargo.toml index f573f35f..a7c8cde5 100644 --- a/nomos-services/log/Cargo.toml +++ b/nomos-services/log/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] async-trait = "0.1" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = { version = "1.0", features = ["derive"] } tracing = "0.1" tracing-appender = "0.2" diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index 227af87b..46f2115f 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -1,12 +1,14 @@ // std +use futures::StreamExt; use std::net::SocketAddr; use std::path::PathBuf; // crates use serde::{Deserialize, Serialize}; -use tracing::Level; +use tracing::{error, Level}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{filter::LevelFilter, prelude::*}; // internal +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::NoMessage, @@ -14,7 +16,10 @@ use overwatch_rs::services::{ ServiceCore, ServiceData, }; -pub struct Logger(Option); +pub struct Logger { + service_state: ServiceStateHandle, + worker_guard: Option, +} #[derive(Clone, Debug, Serialize, Deserialize)] pub enum LoggerBackend { @@ -66,10 +71,10 @@ pub enum LoggerFormat { impl ServiceData for Logger { const SERVICE_ID: &'static str = "Logger"; + type Settings = LoggerSettings; type State = NoState; type StateOperator = NoOperator; type Message = NoMessage; - type Settings = LoggerSettings; } // a macro and not a function because it's a bit of a type @@ -102,7 +107,10 @@ impl ServiceCore for Logger { .runtime() .spawn(async move { task.connect().await }); registry_init!(layer, config.format, config.level); - return Ok(Self(None)); + return Ok(Self { + service_state, + worker_guard: None, + }); } LoggerBackend::File { directory, prefix } => { let file_appender = tracing_appender::rolling::hourly( @@ -119,12 +127,39 @@ impl ServiceCore for Logger { .with_level(true) .with_writer(non_blocking); registry_init!(layer, config.format, config.level); - Ok(Self(Some(_guard))) + Ok(Self { + service_state, + worker_guard: Some(_guard), + }) } async fn run(self) -> Result<(), overwatch_rs::DynError> { + let Self { + service_state, + worker_guard, + } = self; // keep the handle alive without stressing the runtime - futures::pending!(); + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + loop { + if let Some(msg) = lifecycle_stream.next().await { + match msg { + LifecycleMessage::Shutdown(sender) => { + // flush pending logs before signaling message processing + drop(worker_guard); + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + break; + } + LifecycleMessage::Kill => { + break; + } + } + } + } Ok(()) } } diff --git a/nomos-services/mempool/Cargo.toml b/nomos-services/mempool/Cargo.toml index f6143099..fa3b0ebf 100644 --- a/nomos-services/mempool/Cargo.toml +++ b/nomos-services/mempool/Cargo.toml @@ -12,7 +12,7 @@ futures = "0.3" linked-hash-map = { version = "0.5.6", optional = true } nomos-network = { path = "../network" } nomos-core = { path = "../../nomos-core" } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } rand = { version = "0.8", optional = true } serde = { version = "1.0", features = ["derive"] } thiserror = "1.0" @@ -23,7 +23,7 @@ chrono = "0.4" [dev-dependencies] nomos-log = { path = "../log" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } tokio = { version = "1", features = ["full"] } blake2 = "0.10" diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index 37c3ffa2..fe4bc9dd 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -14,13 +14,15 @@ use tokio::sync::oneshot::Sender; use crate::network::NetworkAdapter; use backend::{MemPool, Status}; use nomos_core::block::BlockId; -use nomos_network::NetworkService; +use nomos_network::{NetworkMsg, NetworkService}; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::{OutboundRelay, Relay, RelayMessage}, state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; +use tracing::error; pub struct MempoolService where @@ -168,7 +170,7 @@ where .. } = self; - let network_relay: OutboundRelay<_> = network_relay + let mut network_relay: OutboundRelay<_> = network_relay .connect() .await .expect("Relay connection with NetworkService should succeed"); @@ -180,69 +182,124 @@ where let adapter = adapter.await; let mut network_items = adapter.transactions_stream().await; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); loop { tokio::select! { Some(msg) = service_state.inbound_relay.recv() => { - match msg { - MempoolMsg::Add { item, key, reply_channel } => { - match pool.add_item(key, item.clone()) { - Ok(_id) => { - // Broadcast the item to the network - let net = network_relay.clone(); - let settings = service_state.settings_reader.get_updated_settings().network; - // move sending to a new task so local operations can complete in the meantime - tokio::spawn(async move { - let adapter = N::new(settings, net).await; - adapter.send(item).await; - }); - if let Err(e) = reply_channel.send(Ok(())) { - tracing::debug!("Failed to send reply to AddTx: {:?}", e); - } - } - Err(e) => { - tracing::debug!("could not add tx to the pool due to: {}", e); - } - } - } - MempoolMsg::View { ancestor_hint, reply_channel } => { - reply_channel.send(pool.view(ancestor_hint)).unwrap_or_else(|_| { - tracing::debug!("could not send back pool view") - }); - } - MempoolMsg::MarkInBlock { ids, block } => { - pool.mark_in_block(&ids, block); - } - #[cfg(test)] - MempoolMsg::BlockItems { block, reply_channel } => { - reply_channel.send(pool.block_items(block)).unwrap_or_else(|_| { - tracing::debug!("could not send back block items") - }); - } - MempoolMsg::Prune { ids } => { pool.prune(&ids); }, - MempoolMsg::Metrics { reply_channel } => { - let metrics = MempoolMetrics { - pending_items: pool.pending_item_count(), - last_item_timestamp: pool.last_item_timestamp(), - }; - reply_channel.send(metrics).unwrap_or_else(|_| { - tracing::debug!("could not send back mempool metrics") - }); - } - MempoolMsg::Status { - items, reply_channel - } => { - reply_channel.send(pool.status(&items)).unwrap_or_else(|_| { - tracing::debug!("could not send back mempool status") - }); - } - } + Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await; } Some((key, item )) = network_items.next() => { pool.add_item(key, item).unwrap_or_else(|e| { tracing::debug!("could not add item to the pool due to: {}", e) }); } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +} + +impl MempoolService +where + P: MemPool + Send + 'static, + P::Settings: Clone + Send + Sync + 'static, + N::Settings: Clone + Send + Sync + 'static, + P::Item: Clone + Debug + Send + Sync + 'static, + P::Key: Debug + Send + Sync + 'static, + N: NetworkAdapter + Send + Sync + 'static, + D: Discriminant + Send, +{ + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + + async fn handle_mempool_message( + message: MempoolMsg, + pool: &mut P, + network_relay: &mut OutboundRelay>, + service_state: &mut ServiceStateHandle, + ) { + match message { + MempoolMsg::Add { + item, + key, + reply_channel, + } => { + match pool.add_item(key, item.clone()) { + Ok(_id) => { + // Broadcast the item to the network + let net = network_relay.clone(); + let settings = service_state.settings_reader.get_updated_settings().network; + // move sending to a new task so local operations can complete in the meantime + tokio::spawn(async move { + let adapter = N::new(settings, net).await; + adapter.send(item).await; + }); + if let Err(e) = reply_channel.send(Ok(())) { + tracing::debug!("Failed to send reply to AddTx: {:?}", e); + } + } + Err(e) => { + tracing::debug!("could not add tx to the pool due to: {}", e); + } + } + } + MempoolMsg::View { + ancestor_hint, + reply_channel, + } => { + reply_channel + .send(pool.view(ancestor_hint)) + .unwrap_or_else(|_| tracing::debug!("could not send back pool view")); + } + MempoolMsg::MarkInBlock { ids, block } => { + pool.mark_in_block(&ids, block); + } + #[cfg(test)] + MempoolMsg::BlockItems { + block, + reply_channel, + } => { + reply_channel + .send(pool.block_items(block)) + .unwrap_or_else(|_| tracing::debug!("could not send back block items")); + } + MempoolMsg::Prune { ids } => { + pool.prune(&ids); + } + MempoolMsg::Metrics { reply_channel } => { + let metrics = MempoolMetrics { + pending_items: pool.pending_item_count(), + last_item_timestamp: pool.last_item_timestamp(), + }; + reply_channel + .send(metrics) + .unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics")); + } + MempoolMsg::Status { + items, + reply_channel, + } => { + reply_channel + .send(pool.status(&items)) + .unwrap_or_else(|_| tracing::debug!("could not send back mempool status")); } } } diff --git a/nomos-services/metrics/Cargo.toml b/nomos-services/metrics/Cargo.toml index 6463f109..0d4cb5ed 100644 --- a/nomos-services/metrics/Cargo.toml +++ b/nomos-services/metrics/Cargo.toml @@ -14,9 +14,10 @@ async-graphql = { version = "5", optional = true, features = ["tracing"] } async-trait = "0.1" bytes = "1.3" clap = { version = "4", features = ["derive", "env"], optional = true } +futures = "0.3" nomos-http = { path = "../http", optional = true } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } -overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" } once_cell = "1.16" parking_lot = "0.12" prometheus = "0.13" @@ -27,7 +28,6 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } tower-http = { version = "0.3", features = ["cors", "trace"], optional = true } thiserror = "1" -futures = "0.3" [features] default = [] diff --git a/nomos-services/metrics/src/lib.rs b/nomos-services/metrics/src/lib.rs index d19b4b39..bfa99564 100644 --- a/nomos-services/metrics/src/lib.rs +++ b/nomos-services/metrics/src/lib.rs @@ -1,16 +1,23 @@ +// std use std::fmt::Debug; +// crates +use futures::StreamExt; +use tracing::error; + +// internal +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + relay::RelayMessage, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; + pub mod backend; pub mod frontend; pub mod types; -use overwatch_rs::services::{ - handle::ServiceStateHandle, - relay::{InboundRelay, RelayMessage}, - state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, -}; - #[async_trait::async_trait] pub trait MetricsBackend { type MetricsData: Clone + Send + Sync + Debug + 'static; @@ -22,7 +29,7 @@ pub trait MetricsBackend { } pub struct MetricsService { - inbound_relay: InboundRelay>, + service_state: ServiceStateHandle, backend: Backend, } @@ -128,6 +135,35 @@ impl MetricsService { ) { backend.update(service_id, metrics).await; } + + async fn handle_message(message: MetricsMessage, backend: &mut Backend) { + match message { + MetricsMessage::Load { + service_id, + reply_channel, + } => { + MetricsService::handle_load(backend, &service_id, reply_channel).await; + } + MetricsMessage::Update { service_id, data } => { + MetricsService::handle_update(backend, &service_id, data).await; + } + } + } + + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } } #[async_trait::async_trait] @@ -135,28 +171,32 @@ impl ServiceCore for MetricsSer fn init(service_state: ServiceStateHandle) -> Result { let settings = service_state.settings_reader.get_updated_settings(); let backend = Backend::init(settings); - let inbound_relay = service_state.inbound_relay; Ok(Self { - inbound_relay, + service_state, backend, }) } async fn run(self) -> Result<(), overwatch_rs::DynError> { let Self { - mut inbound_relay, + service_state: + ServiceStateHandle { + mut inbound_relay, + lifecycle_handle, + .. + }, mut backend, } = self; - while let Some(message) = inbound_relay.recv().await { - match message { - MetricsMessage::Load { - service_id, - reply_channel, - } => { - MetricsService::handle_load(&backend, &service_id, reply_channel).await; + let mut lifecycle_stream = lifecycle_handle.message_stream(); + loop { + tokio::select! { + Some(message) = inbound_relay.recv() => { + Self::handle_message(message, &mut backend).await; } - MetricsMessage::Update { service_id, data } => { - MetricsService::handle_update(&mut backend, &service_id, data).await; + Some(message) = lifecycle_stream.next() => { + if Self::should_stop_service(message).await { + break; + } } } } diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 6ff96058..cd809d5b 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -10,7 +10,7 @@ async-trait = "0.1" bytes = "1.2" chrono = { version = "0.4", optional = true } humantime-serde = { version = "1", optional = true } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } multiaddr = "0.15" serde = { version = "1.0", features = ["derive"] } sscanf = { version = "0.4", optional = true } diff --git a/nomos-services/network/src/lib.rs b/nomos-services/network/src/lib.rs index deaf34d2..6715424f 100644 --- a/nomos-services/network/src/lib.rs +++ b/nomos-services/network/src/lib.rs @@ -4,17 +4,20 @@ use std::fmt::{self, Debug}; // crates use async_trait::async_trait; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use tokio::sync::oneshot; // internal use backends::NetworkBackend; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::RelayMessage, state::{NoOperator, ServiceState}, ServiceCore, ServiceData, ServiceId, }; +use tracing::error; pub enum NetworkMsg { Process(B::Message), @@ -84,33 +87,71 @@ where async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { - service_state: ServiceStateHandle { - mut inbound_relay, .. - }, + service_state: + ServiceStateHandle { + mut inbound_relay, + lifecycle_handle, + .. + }, mut backend, } = self; - - while let Some(msg) = inbound_relay.recv().await { - match msg { - NetworkMsg::Process(msg) => { - // split sending in two steps to help the compiler understand we do not - // need to hold an instance of &I (which is not send) across an await point - let _send = backend.process(msg); - _send.await + let mut lifecycle_stream = lifecycle_handle.message_stream(); + loop { + tokio::select! { + Some(msg) = inbound_relay.recv() => { + Self::handle_network_service_message(msg, &mut backend).await; + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } } - NetworkMsg::Subscribe { kind, sender } => sender - .send(backend.subscribe(kind).await) - .unwrap_or_else(|_| { - tracing::warn!( - "client hung up before a subscription handle could be established" - ) - }), } } Ok(()) } } +impl NetworkService +where + B: NetworkBackend + Send + 'static, + B::State: Send + Sync, +{ + async fn handle_network_service_message(msg: NetworkMsg, backend: &mut B) { + match msg { + NetworkMsg::Process(msg) => { + // split sending in two steps to help the compiler understand we do not + // need to hold an instance of &I (which is not send) across an await point + let _send = backend.process(msg); + _send.await + } + NetworkMsg::Subscribe { kind, sender } => sender + .send(backend.subscribe(kind).await) + .unwrap_or_else(|_| { + tracing::warn!( + "client hung up before a subscription handle could be established" + ) + }), + } + } + + async fn should_stop_service(msg: LifecycleMessage) -> bool { + match msg { + LifecycleMessage::Kill => true, + LifecycleMessage::Shutdown(signal_sender) => { + // TODO: Maybe add a call to backend to handle this. Maybe trying to save unprocessed messages? + if signal_sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + } + } +} + impl Clone for NetworkConfig { fn clone(&self) -> Self { NetworkConfig { diff --git a/nomos-services/storage/Cargo.toml b/nomos-services/storage/Cargo.toml index ebb7ae3f..b2a92743 100644 --- a/nomos-services/storage/Cargo.toml +++ b/nomos-services/storage/Cargo.toml @@ -7,9 +7,10 @@ edition = "2021" [dependencies] async-trait = "0.1" +futures = "0.3" tokio = { version = "1", features = ["sync"] } bytes = "1.2" -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = "1.0" sled = { version = "0.34", optional = true } thiserror = "1.0" diff --git a/nomos-services/storage/src/lib.rs b/nomos-services/storage/src/lib.rs index 53b1a099..476f72a0 100644 --- a/nomos-services/storage/src/lib.rs +++ b/nomos-services/storage/src/lib.rs @@ -6,15 +6,18 @@ use std::marker::PhantomData; // crates use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; use overwatch_rs::services::handle::ServiceStateHandle; use serde::de::DeserializeOwned; use serde::Serialize; // internal use backends::StorageBackend; use backends::{StorageSerde, StorageTransaction}; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::RelayMessage; use overwatch_rs::services::state::{NoOperator, NoState}; use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use tracing::error; /// Storage message that maps to [`StorageBackend`] trait pub enum StorageMsg { @@ -162,6 +165,39 @@ pub struct StorageService { } impl StorageService { + async fn should_stop_service(msg: LifecycleMessage) -> bool { + match msg { + LifecycleMessage::Shutdown(sender) => { + // TODO: Try to finish pending transactions if any and close connections properly + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + async fn handle_storage_message(msg: StorageMsg, backend: &mut Backend) { + if let Err(e) = match msg { + StorageMsg::Load { key, reply_channel } => { + Self::handle_load(backend, key, reply_channel).await + } + StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await, + StorageMsg::Remove { key, reply_channel } => { + Self::handle_remove(backend, key, reply_channel).await + } + StorageMsg::Execute { + transaction, + reply_channel, + } => Self::handle_execute(backend, transaction, reply_channel).await, + } { + // TODO: add proper logging + println!("{e}"); + } + } /// Handle load message async fn handle_load( backend: &mut Backend, @@ -243,27 +279,25 @@ impl ServiceCore for StorageSer async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { mut backend, - service_state: ServiceStateHandle { - mut inbound_relay, .. - }, + service_state: + ServiceStateHandle { + mut inbound_relay, + lifecycle_handle, + .. + }, } = self; + let mut lifecycle_stream = lifecycle_handle.message_stream(); let backend = &mut backend; - while let Some(msg) = inbound_relay.recv().await { - if let Err(e) = match msg { - StorageMsg::Load { key, reply_channel } => { - Self::handle_load(backend, key, reply_channel).await + loop { + tokio::select! { + Some(msg) = inbound_relay.recv() => { + Self::handle_storage_message(msg, backend).await; } - StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await, - StorageMsg::Remove { key, reply_channel } => { - Self::handle_remove(backend, key, reply_channel).await + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } } - StorageMsg::Execute { - transaction, - reply_channel, - } => Self::handle_execute(backend, transaction, reply_channel).await, - } { - // TODO: add proper logging - println!("{e}"); } } Ok(()) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 8f761c8e..6e1e5061 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -10,7 +10,7 @@ nomos-consensus = { path = "../nomos-services/consensus" } nomos-network = { path = "../nomos-services/network", features = ["libp2p"]} nomos-log = { path = "../nomos-services/log" } nomos-http = { path = "../nomos-services/http", features = ["http"] } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } nomos-core = { path = "../nomos-core" } consensus-engine = { path = "../consensus-engine", features = ["serde"] } nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] }