Add Sigkill handling to node binary (#480)
* Implement ctrlc watcher service * Add ctrlc service to nomos node bin * Use 1 sized buffer
This commit is contained in:
parent
d479721efd
commit
ef243b0ee5
|
@ -10,6 +10,7 @@ members = [
|
||||||
"nomos-services/mempool",
|
"nomos-services/mempool",
|
||||||
"nomos-services/http",
|
"nomos-services/http",
|
||||||
"nomos-services/data-availability",
|
"nomos-services/data-availability",
|
||||||
|
"nomos-services/system-sig",
|
||||||
"nomos-da/reed-solomon",
|
"nomos-da/reed-solomon",
|
||||||
"nomos-da/kzg",
|
"nomos-da/kzg",
|
||||||
"nomos-da/full-replication",
|
"nomos-da/full-replication",
|
||||||
|
|
|
@ -27,6 +27,7 @@ nomos-consensus = { path = "../../nomos-services/consensus", features = ["libp2p
|
||||||
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
|
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
|
||||||
nomos-libp2p = { path = "../../nomos-libp2p" }
|
nomos-libp2p = { path = "../../nomos-libp2p" }
|
||||||
nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] }
|
nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] }
|
||||||
|
nomos-system-sig = { path = "../../nomos-services/system-sig" }
|
||||||
metrics = { path = "../../nomos-services/metrics", optional = true }
|
metrics = { path = "../../nomos-services/metrics", optional = true }
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
consensus-engine = { path = "../../consensus-engine" }
|
consensus-engine = { path = "../../consensus-engine" }
|
||||||
|
|
|
@ -35,16 +35,17 @@ use nomos_storage::{
|
||||||
StorageService,
|
StorageService,
|
||||||
};
|
};
|
||||||
|
|
||||||
use nomos_network::NetworkService;
|
|
||||||
use overwatch_derive::*;
|
|
||||||
use overwatch_rs::services::handle::ServiceHandle;
|
|
||||||
|
|
||||||
pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs};
|
pub use config::{Config, ConsensusArgs, HttpArgs, LogArgs, NetworkArgs, OverlayArgs};
|
||||||
use nomos_core::{
|
use nomos_core::{
|
||||||
da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
|
da::certificate::select::FillSize as FillSizeWithBlobsCertificate,
|
||||||
tx::select::FillSize as FillSizeWithTx,
|
tx::select::FillSize as FillSizeWithTx,
|
||||||
};
|
};
|
||||||
|
use nomos_network::NetworkService;
|
||||||
|
use nomos_system_sig::SystemSig;
|
||||||
|
use overwatch_derive::*;
|
||||||
|
use overwatch_rs::services::handle::ServiceHandle;
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
|
||||||
pub use tx::Tx;
|
pub use tx::Tx;
|
||||||
|
|
||||||
pub const CL_TOPIC: &str = "cl";
|
pub const CL_TOPIC: &str = "cl";
|
||||||
|
@ -93,6 +94,7 @@ pub struct Nomos {
|
||||||
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
|
metrics: ServiceHandle<MetricsService<MapMetricsBackend<MetricsData>>>,
|
||||||
da: ServiceHandle<DataAvailability>,
|
da: ServiceHandle<DataAvailability>,
|
||||||
storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
|
storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
|
||||||
|
system_sig: ServiceHandle<SystemSig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Wire;
|
pub struct Wire;
|
||||||
|
|
|
@ -106,6 +106,7 @@ fn main() -> Result<()> {
|
||||||
storage: nomos_storage::backends::sled::SledBackendSettings {
|
storage: nomos_storage::backends::sled::SledBackendSettings {
|
||||||
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
|
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
|
||||||
},
|
},
|
||||||
|
system_sig: (),
|
||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
[package]
|
||||||
|
name = "nomos-system-sig"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-trait = "0.1"
|
||||||
|
async-ctrlc = "1.2"
|
||||||
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
|
tokio = "1.33.0"
|
||||||
|
log = "0.4.20"
|
||||||
|
futures = "0.3.28"
|
|
@ -0,0 +1,73 @@
|
||||||
|
// std
|
||||||
|
|
||||||
|
// crates
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
use log::error;
|
||||||
|
// internal
|
||||||
|
use overwatch_rs::overwatch::handle::OverwatchHandle;
|
||||||
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
|
use overwatch_rs::services::relay::NoMessage;
|
||||||
|
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||||
|
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||||
|
use overwatch_rs::DynError;
|
||||||
|
|
||||||
|
pub struct SystemSig {
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SystemSig {
|
||||||
|
async fn should_stop_service(msg: LifecycleMessage) -> bool {
|
||||||
|
match msg {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ctrlc_signal_received(overwatch_handle: &OverwatchHandle) {
|
||||||
|
overwatch_handle.kill().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServiceData for SystemSig {
|
||||||
|
const SERVICE_ID: ServiceId = "SystemSig";
|
||||||
|
const SERVICE_RELAY_BUFFER_SIZE: usize = 1;
|
||||||
|
type Settings = ();
|
||||||
|
type State = NoState<Self::Settings>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = NoMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ServiceCore for SystemSig {
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
|
||||||
|
Ok(Self { service_state })
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(self) -> Result<(), DynError> {
|
||||||
|
let Self { service_state } = self;
|
||||||
|
let mut ctrlc = async_ctrlc::CtrlC::new()?;
|
||||||
|
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = &mut ctrlc => {
|
||||||
|
Self::ctrlc_signal_received(&service_state.overwatch_handle).await;
|
||||||
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue