1
0
mirror of synced 2025-01-11 08:15:48 +00:00

Fix #182: gracefully shutdown (#187)

* finish signal and cleanup
This commit is contained in:
Al Liu 2023-06-15 17:40:29 +08:00 committed by GitHub
parent f631d9b737
commit b6789b994e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 34 deletions

View File

@ -14,6 +14,7 @@ anyhow = "1"
arc-swap = "1.6" arc-swap = "1.6"
bls-signatures = "0.14" bls-signatures = "0.14"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
ctrlc = "3.4"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
crc32fast = "1.3" crc32fast = "1.3"
crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] } crossbeam = { version = "0.8.2", features = ["crossbeam-channel"] }

View File

@ -22,10 +22,11 @@ use simulations::network::regions::{create_regions, RegionsData};
use simulations::network::{InMemoryNetworkInterface, Network}; use simulations::network::{InMemoryNetworkInterface, Network};
use simulations::node::dummy::DummyNode; use simulations::node::dummy::DummyNode;
use simulations::node::{Node, NodeId, OverlayState, ViewOverlay}; use simulations::node::{Node, NodeId, OverlayState, ViewOverlay};
use simulations::output_processors::Record;
use simulations::overlay::{create_overlay, SimulationOverlay}; use simulations::overlay::{create_overlay, SimulationOverlay};
use simulations::runner::SimulationRunnerHandle;
use simulations::streaming::{ use simulations::streaming::{
io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, StreamType,
runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamType,
}; };
// internal // internal
use simulations::{ use simulations::{
@ -162,47 +163,41 @@ where
let stream_settings = settings.stream_settings.clone(); let stream_settings = settings.stream_settings.clone();
let runner = let runner =
SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?; SimulationRunner::<_, _, OutData>::new(network, nodes, Default::default(), settings)?;
macro_rules! bail {
($settings: ident, $sub: ident) => {
let handle = runner.simulate()?;
let mut data_subscriber_handle = handle.subscribe::<$sub<OutData>>($settings)?;
let mut runtime_subscriber_handle =
handle.subscribe::<RuntimeSubscriber<OutData>>(Default::default())?;
let mut settings_subscriber_handle =
handle.subscribe::<SettingsSubscriber<OutData>>(Default::default())?;
std::thread::scope(|s| {
s.spawn(move || {
data_subscriber_handle.run();
});
s.spawn(move || { let handle = match stream_type {
runtime_subscriber_handle.run();
});
s.spawn(move || {
settings_subscriber_handle.run();
});
});
handle.join()?;
};
}
match stream_type {
Some(StreamType::Naive) => { Some(StreamType::Naive) => {
let settings = stream_settings.unwrap_naive(); let settings = stream_settings.unwrap_naive();
bail!(settings, NaiveSubscriber); runner.simulate_and_subscribe::<NaiveSubscriber<OutData>>(settings)?
} }
Some(StreamType::IO) => { Some(StreamType::IO) => {
let settings = stream_settings.unwrap_io(); let settings = stream_settings.unwrap_io();
bail!(settings, IOSubscriber); runner.simulate_and_subscribe::<IOSubscriber<OutData>>(settings)?
} }
Some(StreamType::Polars) => { Some(StreamType::Polars) => {
let settings = stream_settings.unwrap_polars(); let settings = stream_settings.unwrap_polars();
bail!(settings, PolarsSubscriber); runner.simulate_and_subscribe::<PolarsSubscriber<OutData>>(settings)?
}
None => {
runner.simulate()?.join()?;
} }
None => runner.simulate()?,
}; };
signal(handle)
}
fn signal<R: Record>(handle: SimulationRunnerHandle<R>) -> anyhow::Result<()> {
let (tx, rx) = crossbeam::channel::bounded(1);
ctrlc::set_handler(move || {
tx.send(()).unwrap();
})?;
loop {
crossbeam::select! {
recv(rx) -> _ => {
handle.stop()?;
tracing::info!("gracefully shutwon the simulation app");
break;
},
default => {}
}
}
Ok(()) Ok(())
} }
@ -241,7 +236,7 @@ fn main() -> anyhow::Result<()> {
let app: SimulationApp = SimulationApp::parse(); let app: SimulationApp = SimulationApp::parse();
if let Err(e) = app.run() { if let Err(e) = app.run() {
tracing::error!("Error: {}", e); tracing::error!("error: {}", e);
std::process::exit(1); std::process::exit(1);
} }
Ok(()) Ok(())

View File

@ -9,7 +9,10 @@ use std::time::Duration;
use crate::output_processors::Record; use crate::output_processors::Record;
// crates // crates
use crate::streaming::{StreamProducer, Subscriber, SubscriberHandle}; use crate::streaming::{
runtime_subscriber::RuntimeSubscriber, settings_subscriber::SettingsSubscriber, StreamProducer,
Subscriber, SubscriberHandle,
};
use crossbeam::channel::Sender; use crossbeam::channel::Sender;
use parking_lot::RwLock; use parking_lot::RwLock;
use rand::rngs::SmallRng; use rand::rngs::SmallRng;
@ -176,3 +179,47 @@ where
} }
} }
} }
impl<M, N: Node, R> SimulationRunner<M, N, R>
where
M: Clone + Send + Sync + 'static,
N: Send + Sync + 'static,
N::Settings: Clone + Send,
N::State: Serialize,
R: Record
+ serde::Serialize
+ for<'a> TryFrom<&'a SimulationState<N>, Error = anyhow::Error>
+ Send
+ Sync
+ 'static,
{
pub fn simulate_and_subscribe<S>(
self,
settings: S::Settings,
) -> anyhow::Result<SimulationRunnerHandle<R>>
where
S: Subscriber<Record = R> + Send + Sync + 'static,
{
let handle = self.simulate()?;
let mut data_subscriber_handle = handle.subscribe::<S>(settings)?;
let mut runtime_subscriber_handle =
handle.subscribe::<RuntimeSubscriber<R>>(Default::default())?;
let mut settings_subscriber_handle =
handle.subscribe::<SettingsSubscriber<R>>(Default::default())?;
std::thread::scope(|s| {
s.spawn(move || {
data_subscriber_handle.run();
});
s.spawn(move || {
runtime_subscriber_handle.run();
});
s.spawn(move || {
settings_subscriber_handle.run();
});
});
Ok(handle)
}
}