From b6789b994eee90bd8412fe2478cd7ad9c08c6495 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Thu, 15 Jun 2023 17:40:29 +0800 Subject: [PATCH] Fix #182: gracefully shutdown (#187) * finish signal and cleanup --- simulations/Cargo.toml | 1 + simulations/src/bin/app.rs | 61 ++++++++++++++++------------------- simulations/src/runner/mod.rs | 49 +++++++++++++++++++++++++++- 3 files changed, 77 insertions(+), 34 deletions(-) diff --git a/simulations/Cargo.toml b/simulations/Cargo.toml index ae66cbad..86ef6f5f 100644 --- a/simulations/Cargo.toml +++ b/simulations/Cargo.toml @@ -14,6 +14,7 @@ anyhow = "1" arc-swap = "1.6" bls-signatures = "0.14" clap = { version = "4", features = ["derive"] } +ctrlc = "3.4" chrono = { version = "0.4", features = ["serde"] } crc32fast = "1.3" crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } diff --git a/simulations/src/bin/app.rs b/simulations/src/bin/app.rs index e24baf71..27e2b430 100644 --- a/simulations/src/bin/app.rs +++ b/simulations/src/bin/app.rs @@ -22,10 +22,11 @@ use simulations::network::regions::{create_regions, RegionsData}; use simulations::network::{InMemoryNetworkInterface, Network}; use simulations::node::dummy::DummyNode; use simulations::node::{Node, NodeId, OverlayState, ViewOverlay}; +use simulations::output_processors::Record; use simulations::overlay::{create_overlay, SimulationOverlay}; +use simulations::runner::SimulationRunnerHandle; use simulations::streaming::{ - io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, - runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamType, + io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, StreamType, }; // internal use simulations::{ @@ -162,47 +163,41 @@ where let stream_settings = settings.stream_settings.clone(); let runner = SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?; - macro_rules! bail { - ($settings: ident, $sub: ident) => { - let handle = runner.simulate()?; - let mut data_subscriber_handle = handle.subscribe::<$sub>($settings)?; - let mut runtime_subscriber_handle = - handle.subscribe::>(Default::default())?; - let mut settings_subscriber_handle = - handle.subscribe::>(Default::default())?; - std::thread::scope(|s| { - s.spawn(move || { - data_subscriber_handle.run(); - }); - s.spawn(move || { - runtime_subscriber_handle.run(); - }); - - s.spawn(move || { - settings_subscriber_handle.run(); - }); - }); - handle.join()?; - }; - } - match stream_type { + let handle = match stream_type { Some(StreamType::Naive) => { let settings = stream_settings.unwrap_naive(); - bail!(settings, NaiveSubscriber); + runner.simulate_and_subscribe::>(settings)? } Some(StreamType::IO) => { let settings = stream_settings.unwrap_io(); - bail!(settings, IOSubscriber); + runner.simulate_and_subscribe::>(settings)? } Some(StreamType::Polars) => { let settings = stream_settings.unwrap_polars(); - bail!(settings, PolarsSubscriber); - } - None => { - runner.simulate()?.join()?; + runner.simulate_and_subscribe::>(settings)? } + None => runner.simulate()?, }; + + signal(handle) +} + +fn signal(handle: SimulationRunnerHandle) -> anyhow::Result<()> { + let (tx, rx) = crossbeam::channel::bounded(1); + ctrlc::set_handler(move || { + tx.send(()).unwrap(); + })?; + loop { + crossbeam::select! { + recv(rx) -> _ => { + handle.stop()?; + tracing::info!("gracefully shutwon the simulation app"); + break; + }, + default => {} + } + } Ok(()) } @@ -241,7 +236,7 @@ fn main() -> anyhow::Result<()> { let app: SimulationApp = SimulationApp::parse(); if let Err(e) = app.run() { - tracing::error!("Error: {}", e); + tracing::error!("error: {}", e); std::process::exit(1); } Ok(()) diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index fddc82aa..5d265f16 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -9,7 +9,10 @@ use std::time::Duration; use crate::output_processors::Record; // crates -use crate::streaming::{StreamProducer, Subscriber, SubscriberHandle}; +use crate::streaming::{ + runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamProducer, + Subscriber, SubscriberHandle, +}; use crossbeam::channel::Sender; use parking_lot::RwLock; use rand::rngs::SmallRng; @@ -176,3 +179,47 @@ where } } } + +impl SimulationRunner +where + M: Clone + Send + Sync + 'static, + N: Send + Sync + 'static, + N::Settings: Clone + Send, + N::State: Serialize, + R: Record + + serde::Serialize + + for<'a> TryFrom<&'a SimulationState, Error = anyhow::Error> + + Send + + Sync + + 'static, +{ + pub fn simulate_and_subscribe( + self, + settings: S::Settings, + ) -> anyhow::Result> + where + S: Subscriber + Send + Sync + 'static, + { + let handle = self.simulate()?; + let mut data_subscriber_handle = handle.subscribe::(settings)?; + let mut runtime_subscriber_handle = + handle.subscribe::>(Default::default())?; + let mut settings_subscriber_handle = + handle.subscribe::>(Default::default())?; + std::thread::scope(|s| { + s.spawn(move || { + data_subscriber_handle.run(); + }); + + s.spawn(move || { + runtime_subscriber_handle.run(); + }); + + s.spawn(move || { + settings_subscriber_handle.run(); + }); + }); + + Ok(handle) + } +}