diff --git a/simulations/src/bin/app/main.rs b/simulations/src/bin/app/main.rs index d9fea65e..c7298556 100644 --- a/simulations/src/bin/app/main.rs +++ b/simulations/src/bin/app/main.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; // crates use anyhow::Ok; use clap::Parser; @@ -172,6 +172,7 @@ where } fn signal(handle: SimulationRunnerHandle) -> anyhow::Result<()> { + let handle = Arc::new(handle); let (tx, rx) = crossbeam::channel::bounded(1); ctrlc::set_handler(move || { tx.send(()).unwrap(); @@ -180,10 +181,16 @@ fn signal(handle: SimulationRunnerHandle) -> anyhow::Result<()> { crossbeam::select! { recv(rx) -> _ => { handle.stop()?; - tracing::info!("gracefully shutwon the simulation app"); + tracing::info!("gracefully shutdown the simulation app"); break; }, - default => {} + default => { + if handle.is_finished() { + handle.shutdown()?; + break; + } + std::thread::sleep(Duration::from_millis(50)); + } } } Ok(()) diff --git a/simulations/src/runner/mod.rs b/simulations/src/runner/mod.rs index a70b0d7f..c37f1312 100644 --- a/simulations/src/runner/mod.rs +++ b/simulations/src/runner/mod.rs @@ -40,10 +40,10 @@ impl SimulationRunnerHandle { self.stop() } - pub fn stop(self) -> anyhow::Result<()> { + pub fn stop(&self) -> anyhow::Result<()> { if !self.handle.is_finished() { self.stop_tx.send(())?; - self.producer.stop()?; + self.shutdown()?; } Ok(()) } @@ -55,6 +55,14 @@ impl SimulationRunnerHandle { self.producer.subscribe(settings) } + pub fn is_finished(&self) -> bool { + self.handle.is_finished() + } + + pub fn shutdown(&self) -> anyhow::Result<()> { + self.producer.stop() + } + pub fn join(self) -> anyhow::Result<()> { self.handle.join().expect("Join simulation thread") } diff --git a/simulations/src/streaming/io.rs b/simulations/src/streaming/io.rs index 41f2893f..9332731b 100644 --- a/simulations/src/streaming/io.rs +++ b/simulations/src/streaming/io.rs @@ -2,6 +2,7 @@ use std::{any::Any, io::stdout, sync::Arc}; use super::{Receivers, StreamSettings, Subscriber}; use crate::output_processors::{RecordType, Runtime}; +use crossbeam::channel::{Receiver, Sender}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; @@ -61,8 +62,8 @@ where type Settings = IOStreamSettings; fn new( - record_recv: crossbeam::channel::Receiver>, - stop_recv: crossbeam::channel::Receiver<()>, + record_recv: Receiver>, + stop_recv: Receiver>, settings: Self::Settings, ) -> anyhow::Result where @@ -84,18 +85,22 @@ where fn run(self) -> anyhow::Result<()> { loop { crossbeam::select! { - recv(self.recvs.stop_rx) -> _ => { + recv(self.recvs.stop_rx) -> finish_tx => { + // Flush remaining messages after stop signal. + while let Ok(msg) = self.recvs.recv.try_recv() { + self.sink(msg)?; + } + // collect the run time meta self.sink(Arc::new(R::from(Runtime::load()?)))?; - break; + + finish_tx?.send(())? } recv(self.recvs.recv) -> msg => { self.sink(msg?)?; } } } - - Ok(()) } fn sink(&self, state: Arc) -> anyhow::Result<()> { diff --git a/simulations/src/streaming/mod.rs b/simulations/src/streaming/mod.rs index 6d6a5d7b..1acd7c1a 100644 --- a/simulations/src/streaming/mod.rs +++ b/simulations/src/streaming/mod.rs @@ -23,7 +23,7 @@ pub enum SubscriberType { #[derive(Debug)] struct Receivers { - stop_rx: Receiver<()>, + stop_rx: Receiver>, recv: Receiver>, } @@ -99,7 +99,7 @@ impl StreamSettings { pub struct SubscriberHandle { handle: Option>>, - stop_tx: Sender<()>, + stop_tx: Sender>, subscriber: Option, } @@ -127,7 +127,9 @@ where if let Some(handle) = self.handle { // if we have a handle, and the handle is not finished if !handle.is_finished() { - self.stop_tx.send(())?; + let (finish_tx, finish_rx) = bounded(1); + self.stop_tx.send(finish_tx)?; + finish_rx.recv()?; } else { // we are sure the handle is finished, so we can join it and try to get the result. // if we have any error on subscriber side, return the error. @@ -151,7 +153,7 @@ where struct Senders { record_ty: RecordType, record_sender: Sender>, - stop_sender: Sender<()>, + stop_sender: Sender>, } #[derive(Debug)] @@ -253,7 +255,7 @@ where }) } - pub fn stop(self) -> anyhow::Result<()> { + pub fn stop(&self) -> anyhow::Result<()> { let meta_record = Arc::new(R::from(Runtime::load()?)); let inner = self.inner.lock().unwrap(); @@ -268,8 +270,11 @@ where // send stop signal to all subscribers inner.senders.iter().for_each(|tx| { - if let Err(e) = tx.stop_sender.send(()) { + let (finish_tx, finish_rx) = bounded(1); + if let Err(e) = tx.stop_sender.send(finish_tx) { tracing::error!("Error stopping subscriber: {e}"); + } else if let Err(e) = finish_rx.recv() { + tracing::error!("Error finilizing subscriber: {e}"); } }); Ok(()) @@ -282,7 +287,7 @@ pub trait Subscriber { fn new( record_recv: Receiver>, - stop_recv: Receiver<()>, + stop_recv: Receiver>, settings: Self::Settings, ) -> anyhow::Result where diff --git a/simulations/src/streaming/naive.rs b/simulations/src/streaming/naive.rs index 3f18f7ad..5ff33efc 100644 --- a/simulations/src/streaming/naive.rs +++ b/simulations/src/streaming/naive.rs @@ -1,5 +1,6 @@ use super::{Receivers, StreamSettings, Subscriber}; use crate::output_processors::{RecordType, Runtime}; +use crossbeam::channel::{Receiver, Sender}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use std::{ @@ -49,8 +50,8 @@ where type Settings = NaiveSettings; fn new( - record_recv: crossbeam::channel::Receiver>, - stop_recv: crossbeam::channel::Receiver<()>, + record_recv: Receiver>, + stop_recv: Receiver>, settings: Self::Settings, ) -> anyhow::Result where @@ -86,18 +87,22 @@ where fn run(self) -> anyhow::Result<()> { loop { crossbeam::select! { - recv(self.recvs.stop_rx) -> _ => { + recv(self.recvs.stop_rx) -> finish_tx => { + // Flush remaining messages after stop signal. + while let Ok(msg) = self.recvs.recv.try_recv() { + self.sink(msg)?; + } + // collect the run time meta self.sink(Arc::new(R::from(Runtime::load()?)))?; - break; + + finish_tx?.send(())? } recv(self.recvs.recv) -> msg => { self.sink(msg?)?; } } } - - Ok(()) } fn sink(&self, state: Arc) -> anyhow::Result<()> { diff --git a/simulations/src/streaming/polars.rs b/simulations/src/streaming/polars.rs index c000efec..249e1778 100644 --- a/simulations/src/streaming/polars.rs +++ b/simulations/src/streaming/polars.rs @@ -1,5 +1,6 @@ use super::{Receivers, StreamSettings}; use crate::output_processors::{RecordType, Runtime}; +use crossbeam::channel::{Receiver, Sender}; use parking_lot::Mutex; use polars::prelude::*; use serde::{Deserialize, Serialize}; @@ -97,8 +98,8 @@ where type Settings = PolarsSettings; fn new( - record_recv: crossbeam::channel::Receiver>, - stop_recv: crossbeam::channel::Receiver<()>, + record_recv: Receiver>, + stop_recv: Receiver>, settings: Self::Settings, ) -> anyhow::Result where @@ -137,9 +138,16 @@ where fn run(self) -> anyhow::Result<()> { loop { crossbeam::select! { - recv(self.recvs.stop_rx) -> _ => { + recv(self.recvs.stop_rx) -> finish_tx => { + // Flush remaining messages after stop signal. + while let Ok(msg) = self.recvs.recv.try_recv() { + self.sink(msg)?; + } + // collect the run time meta self.sink(Arc::new(R::from(Runtime::load()?)))?; + + finish_tx?.send(())?; return self.persist(); } recv(self.recvs.recv) -> msg => { diff --git a/simulations/src/streaming/runtime_subscriber.rs b/simulations/src/streaming/runtime_subscriber.rs index 036f1923..f4c36bec 100644 --- a/simulations/src/streaming/runtime_subscriber.rs +++ b/simulations/src/streaming/runtime_subscriber.rs @@ -1,5 +1,6 @@ use super::{Receivers, Subscriber}; use crate::output_processors::{RecordType, Runtime}; +use crossbeam::channel::{Receiver, Sender}; use serde::{Deserialize, Serialize}; use std::{ fs::{File, OpenOptions}, @@ -37,8 +38,8 @@ where type Settings = RuntimeSettings; fn new( - record_recv: crossbeam::channel::Receiver>, - stop_recv: crossbeam::channel::Receiver<()>, + record_recv: Receiver>, + stop_recv: Receiver>, settings: Self::Settings, ) -> anyhow::Result where @@ -73,9 +74,10 @@ where fn run(self) -> anyhow::Result<()> { crossbeam::select! { - recv(self.recvs.stop_rx) -> _ => { + recv(self.recvs.stop_rx) -> finish_tx => { // collect the run time meta self.sink(Arc::new(R::from(Runtime::load()?)))?; + finish_tx?.send(())?; } recv(self.recvs.recv) -> msg => { self.sink(msg?)?; diff --git a/simulations/src/streaming/settings_subscriber.rs b/simulations/src/streaming/settings_subscriber.rs index c5835efc..2d3aa07c 100644 --- a/simulations/src/streaming/settings_subscriber.rs +++ b/simulations/src/streaming/settings_subscriber.rs @@ -1,5 +1,6 @@ use super::{Receivers, Subscriber}; use crate::output_processors::{RecordType, Runtime}; +use crossbeam::channel::{Receiver, Sender}; use serde::{Deserialize, Serialize}; use std::{ fs::{File, OpenOptions}, @@ -37,8 +38,8 @@ where type Settings = SettingsSubscriberSettings; fn new( - record_recv: crossbeam::channel::Receiver>, - stop_recv: crossbeam::channel::Receiver<()>, + record_recv: Receiver>, + stop_recv: Receiver>, settings: Self::Settings, ) -> anyhow::Result where @@ -73,9 +74,10 @@ where fn run(self) -> anyhow::Result<()> { crossbeam::select! { - recv(self.recvs.stop_rx) -> _ => { + recv(self.recvs.stop_rx) -> finish_tx => { // collect the run time meta self.sink(Arc::new(R::from(Runtime::load()?)))?; + finish_tx?.send(())?; } recv(self.recvs.recv) -> msg => { self.sink(msg?)?;