From ef243b0ee5ab72a87d1b46813970fb094cd6ae78 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Mon, 30 Oct 2023 09:36:07 +0100 Subject: [PATCH] Add Sigkill handling to node binary (#480) * Implement ctrlc watcher service * Add ctrlc service to nomos node bin * Use 1 sized buffer --- Cargo.toml | 1 + nodes/nomos-node/Cargo.toml | 1 + nodes/nomos-node/src/lib.rs | 10 ++-- nodes/nomos-node/src/main.rs | 1 + nomos-services/system-sig/Cargo.toml | 14 ++++++ nomos-services/system-sig/src/lib.rs | 73 ++++++++++++++++++++++++++++ 6 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 nomos-services/system-sig/Cargo.toml create mode 100644 nomos-services/system-sig/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index be01f92a..1c5b70c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "nomos-services/mempool", "nomos-services/http", "nomos-services/data-availability", + "nomos-services/system-sig", "nomos-da/reed-solomon", "nomos-da/kzg", "nomos-da/full-replication", diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 5e028be2..293b6536 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -27,6 +27,7 @@ nomos-consensus = { path = "../../nomos-services/consensus", features = ["libp2p nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] } +nomos-system-sig = { path = "../../nomos-services/system-sig" } metrics = { path = "../../nomos-services/metrics", optional = true } tracing-subscriber = "0.3" consensus-engine = { path = "../../consensus-engine" } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index 116c2608..bc29d230 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -35,16 +35,17 @@ use nomos_storage::{ StorageService, }; -use nomos_network::NetworkService; -use overwatch_derive::*; -use overwatch_rs::services::handle::ServiceHandle; - pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs}; use nomos_core::{ da::certificate::select::FillSize as FillSizeWithBlobsCertificate, tx::select::FillSize as FillSizeWithTx, }; +use nomos_network::NetworkService; +use nomos_system_sig::SystemSig; +use overwatch_derive::*; +use overwatch_rs::services::handle::ServiceHandle; use serde::{de::DeserializeOwned, Serialize}; + pub use tx::Tx; pub const CL_TOPIC: &str = "cl"; @@ -93,6 +94,7 @@ pub struct Nomos { metrics: ServiceHandle>>, da: ServiceHandle, storage: ServiceHandle>>, + system_sig: ServiceHandle, } pub struct Wire; diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index d0cf26e1..bca6325c 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -106,6 +106,7 @@ fn main() -> Result<()> { storage: nomos_storage::backends::sled::SledBackendSettings { db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), }, + system_sig: (), }, None, ) diff --git a/nomos-services/system-sig/Cargo.toml b/nomos-services/system-sig/Cargo.toml new file mode 100644 index 00000000..98e01c0b --- /dev/null +++ b/nomos-services/system-sig/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "nomos-system-sig" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +async-ctrlc = "1.2" +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +tokio = "1.33.0" +log = "0.4.20" +futures = "0.3.28" diff --git a/nomos-services/system-sig/src/lib.rs b/nomos-services/system-sig/src/lib.rs new file mode 100644 index 00000000..ff4d54b2 --- /dev/null +++ b/nomos-services/system-sig/src/lib.rs @@ -0,0 +1,73 @@ +// std + +// crates +use futures::stream::StreamExt; +use log::error; +// internal +use overwatch_rs::overwatch::handle::OverwatchHandle; +use overwatch_rs::services::handle::ServiceStateHandle; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::relay::NoMessage; +use overwatch_rs::services::state::{NoOperator, NoState}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::DynError; + +pub struct SystemSig { + service_state: ServiceStateHandle, +} + +impl SystemSig { + async fn should_stop_service(msg: LifecycleMessage) -> bool { + match msg { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + + async fn ctrlc_signal_received(overwatch_handle: &OverwatchHandle) { + overwatch_handle.kill().await + } +} + +impl ServiceData for SystemSig { + const SERVICE_ID: ServiceId = "SystemSig"; + const SERVICE_RELAY_BUFFER_SIZE: usize = 1; + type Settings = (); + type State = NoState; + type StateOperator = NoOperator; + type Message = NoMessage; +} + +#[async_trait::async_trait] +impl ServiceCore for SystemSig { + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { service_state }) + } + + async fn run(self) -> Result<(), DynError> { + let Self { service_state } = self; + let mut ctrlc = async_ctrlc::CtrlC::new()?; + let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); + loop { + tokio::select! { + _ = &mut ctrlc => { + Self::ctrlc_signal_received(&service_state.overwatch_handle).await; + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +}