Simulation graceful shutdown fixes (#305)

* Handle finished simulation

* Flush remaining records to the sink

* Write runtime metadata last
This commit is contained in:
gusto 2023-08-16 11:08:03 +03:00 committed by GitHub
parent 2612c77306
commit fe3d39071d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 33 deletions

View File

@ -2,7 +2,7 @@
use std::fs::File; use std::fs::File;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
// crates // crates
use anyhow::Ok; use anyhow::Ok;
use clap::Parser; use clap::Parser;
@ -172,6 +172,7 @@ where
} }
fn signal<R: Record>(handle: SimulationRunnerHandle<R>) -> anyhow::Result<()> { fn signal<R: Record>(handle: SimulationRunnerHandle<R>) -> anyhow::Result<()> {
let handle = Arc::new(handle);
let (tx, rx) = crossbeam::channel::bounded(1); let (tx, rx) = crossbeam::channel::bounded(1);
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
tx.send(()).unwrap(); tx.send(()).unwrap();
@ -180,10 +181,16 @@ fn signal<R: Record>(handle: SimulationRunnerHandle<R>) -> anyhow::Result<()> {
crossbeam::select! { crossbeam::select! {
recv(rx) -> _ => { recv(rx) -> _ => {
handle.stop()?; handle.stop()?;
tracing::info!("gracefully shutwon the simulation app"); tracing::info!("gracefully shutdown the simulation app");
break; break;
}, },
default => {} default => {
if handle.is_finished() {
handle.shutdown()?;
break;
}
std::thread::sleep(Duration::from_millis(50));
}
} }
} }
Ok(()) Ok(())

View File

@ -40,10 +40,10 @@ impl<R: Record> SimulationRunnerHandle<R> {
self.stop() self.stop()
} }
pub fn stop(self) -> anyhow::Result<()> { pub fn stop(&self) -> anyhow::Result<()> {
if !self.handle.is_finished() { if !self.handle.is_finished() {
self.stop_tx.send(())?; self.stop_tx.send(())?;
self.producer.stop()?; self.shutdown()?;
} }
Ok(()) Ok(())
} }
@ -55,6 +55,14 @@ impl<R: Record> SimulationRunnerHandle<R> {
self.producer.subscribe(settings) self.producer.subscribe(settings)
} }
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
}
pub fn shutdown(&self) -> anyhow::Result<()> {
self.producer.stop()
}
pub fn join(self) -> anyhow::Result<()> { pub fn join(self) -> anyhow::Result<()> {
self.handle.join().expect("Join simulation thread") self.handle.join().expect("Join simulation thread")
} }

View File

@ -2,6 +2,7 @@ use std::{any::Any, io::stdout, sync::Arc};
use super::{Receivers, StreamSettings, Subscriber}; use super::{Receivers, StreamSettings, Subscriber};
use crate::output_processors::{RecordType, Runtime}; use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -61,8 +62,8 @@ where
type Settings = IOStreamSettings; type Settings = IOStreamSettings;
fn new( fn new(
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>, record_recv: Receiver<Arc<Self::Record>>,
stop_recv: crossbeam::channel::Receiver<()>, stop_recv: Receiver<Sender<()>>,
settings: Self::Settings, settings: Self::Settings,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where
@ -84,18 +85,22 @@ where
fn run(self) -> anyhow::Result<()> { fn run(self) -> anyhow::Result<()> {
loop { loop {
crossbeam::select! { crossbeam::select! {
recv(self.recvs.stop_rx) -> _ => { recv(self.recvs.stop_rx) -> finish_tx => {
// Flush remaining messages after stop signal.
while let Ok(msg) = self.recvs.recv.try_recv() {
self.sink(msg)?;
}
// collect the run time meta // collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?; self.sink(Arc::new(R::from(Runtime::load()?)))?;
break;
finish_tx?.send(())?
} }
recv(self.recvs.recv) -> msg => { recv(self.recvs.recv) -> msg => {
self.sink(msg?)?; self.sink(msg?)?;
} }
} }
} }
Ok(())
} }
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> { fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {

View File

@ -23,7 +23,7 @@ pub enum SubscriberType {
#[derive(Debug)] #[derive(Debug)]
struct Receivers<R> { struct Receivers<R> {
stop_rx: Receiver<()>, stop_rx: Receiver<Sender<()>>,
recv: Receiver<Arc<R>>, recv: Receiver<Arc<R>>,
} }
@ -99,7 +99,7 @@ impl StreamSettings {
pub struct SubscriberHandle<S> { pub struct SubscriberHandle<S> {
handle: Option<std::thread::JoinHandle<anyhow::Result<()>>>, handle: Option<std::thread::JoinHandle<anyhow::Result<()>>>,
stop_tx: Sender<()>, stop_tx: Sender<Sender<()>>,
subscriber: Option<S>, subscriber: Option<S>,
} }
@ -127,7 +127,9 @@ where
if let Some(handle) = self.handle { if let Some(handle) = self.handle {
// if we have a handle, and the handle is not finished // if we have a handle, and the handle is not finished
if !handle.is_finished() { if !handle.is_finished() {
self.stop_tx.send(())?; let (finish_tx, finish_rx) = bounded(1);
self.stop_tx.send(finish_tx)?;
finish_rx.recv()?;
} else { } else {
// we are sure the handle is finished, so we can join it and try to get the result. // we are sure the handle is finished, so we can join it and try to get the result.
// if we have any error on subscriber side, return the error. // if we have any error on subscriber side, return the error.
@ -151,7 +153,7 @@ where
struct Senders<R> { struct Senders<R> {
record_ty: RecordType, record_ty: RecordType,
record_sender: Sender<Arc<R>>, record_sender: Sender<Arc<R>>,
stop_sender: Sender<()>, stop_sender: Sender<Sender<()>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -253,7 +255,7 @@ where
}) })
} }
pub fn stop(self) -> anyhow::Result<()> { pub fn stop(&self) -> anyhow::Result<()> {
let meta_record = Arc::new(R::from(Runtime::load()?)); let meta_record = Arc::new(R::from(Runtime::load()?));
let inner = self.inner.lock().unwrap(); let inner = self.inner.lock().unwrap();
@ -268,8 +270,11 @@ where
// send stop signal to all subscribers // send stop signal to all subscribers
inner.senders.iter().for_each(|tx| { inner.senders.iter().for_each(|tx| {
if let Err(e) = tx.stop_sender.send(()) { let (finish_tx, finish_rx) = bounded(1);
if let Err(e) = tx.stop_sender.send(finish_tx) {
tracing::error!("Error stopping subscriber: {e}"); tracing::error!("Error stopping subscriber: {e}");
} else if let Err(e) = finish_rx.recv() {
tracing::error!("Error finilizing subscriber: {e}");
} }
}); });
Ok(()) Ok(())
@ -282,7 +287,7 @@ pub trait Subscriber {
fn new( fn new(
record_recv: Receiver<Arc<Self::Record>>, record_recv: Receiver<Arc<Self::Record>>,
stop_recv: Receiver<()>, stop_recv: Receiver<Sender<()>>,
settings: Self::Settings, settings: Self::Settings,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where

View File

@ -1,5 +1,6 @@
use super::{Receivers, StreamSettings, Subscriber}; use super::{Receivers, StreamSettings, Subscriber};
use crate::output_processors::{RecordType, Runtime}; use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
@ -49,8 +50,8 @@ where
type Settings = NaiveSettings; type Settings = NaiveSettings;
fn new( fn new(
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>, record_recv: Receiver<Arc<Self::Record>>,
stop_recv: crossbeam::channel::Receiver<()>, stop_recv: Receiver<Sender<()>>,
settings: Self::Settings, settings: Self::Settings,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where
@ -86,18 +87,22 @@ where
fn run(self) -> anyhow::Result<()> { fn run(self) -> anyhow::Result<()> {
loop { loop {
crossbeam::select! { crossbeam::select! {
recv(self.recvs.stop_rx) -> _ => { recv(self.recvs.stop_rx) -> finish_tx => {
// Flush remaining messages after stop signal.
while let Ok(msg) = self.recvs.recv.try_recv() {
self.sink(msg)?;
}
// collect the run time meta // collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?; self.sink(Arc::new(R::from(Runtime::load()?)))?;
break;
finish_tx?.send(())?
} }
recv(self.recvs.recv) -> msg => { recv(self.recvs.recv) -> msg => {
self.sink(msg?)?; self.sink(msg?)?;
} }
} }
} }
Ok(())
} }
fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> { fn sink(&self, state: Arc<Self::Record>) -> anyhow::Result<()> {

View File

@ -1,5 +1,6 @@
use super::{Receivers, StreamSettings}; use super::{Receivers, StreamSettings};
use crate::output_processors::{RecordType, Runtime}; use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use parking_lot::Mutex; use parking_lot::Mutex;
use polars::prelude::*; use polars::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -97,8 +98,8 @@ where
type Settings = PolarsSettings; type Settings = PolarsSettings;
fn new( fn new(
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>, record_recv: Receiver<Arc<Self::Record>>,
stop_recv: crossbeam::channel::Receiver<()>, stop_recv: Receiver<Sender<()>>,
settings: Self::Settings, settings: Self::Settings,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where
@ -137,9 +138,16 @@ where
fn run(self) -> anyhow::Result<()> { fn run(self) -> anyhow::Result<()> {
loop { loop {
crossbeam::select! { crossbeam::select! {
recv(self.recvs.stop_rx) -> _ => { recv(self.recvs.stop_rx) -> finish_tx => {
// Flush remaining messages after stop signal.
while let Ok(msg) = self.recvs.recv.try_recv() {
self.sink(msg)?;
}
// collect the run time meta // collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?; self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
return self.persist(); return self.persist();
} }
recv(self.recvs.recv) -> msg => { recv(self.recvs.recv) -> msg => {

View File

@ -1,5 +1,6 @@
use super::{Receivers, Subscriber}; use super::{Receivers, Subscriber};
use crate::output_processors::{RecordType, Runtime}; use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
@ -37,8 +38,8 @@ where
type Settings = RuntimeSettings; type Settings = RuntimeSettings;
fn new( fn new(
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>, record_recv: Receiver<Arc<Self::Record>>,
stop_recv: crossbeam::channel::Receiver<()>, stop_recv: Receiver<Sender<()>>,
settings: Self::Settings, settings: Self::Settings,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where
@ -73,9 +74,10 @@ where
fn run(self) -> anyhow::Result<()> { fn run(self) -> anyhow::Result<()> {
crossbeam::select! { crossbeam::select! {
recv(self.recvs.stop_rx) -> _ => { recv(self.recvs.stop_rx) -> finish_tx => {
// collect the run time meta // collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?; self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
} }
recv(self.recvs.recv) -> msg => { recv(self.recvs.recv) -> msg => {
self.sink(msg?)?; self.sink(msg?)?;

View File

@ -1,5 +1,6 @@
use super::{Receivers, Subscriber}; use super::{Receivers, Subscriber};
use crate::output_processors::{RecordType, Runtime}; use crate::output_processors::{RecordType, Runtime};
use crossbeam::channel::{Receiver, Sender};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
fs::{File, OpenOptions}, fs::{File, OpenOptions},
@ -37,8 +38,8 @@ where
type Settings = SettingsSubscriberSettings; type Settings = SettingsSubscriberSettings;
fn new( fn new(
record_recv: crossbeam::channel::Receiver<Arc<Self::Record>>, record_recv: Receiver<Arc<Self::Record>>,
stop_recv: crossbeam::channel::Receiver<()>, stop_recv: Receiver<Sender<()>>,
settings: Self::Settings, settings: Self::Settings,
) -> anyhow::Result<Self> ) -> anyhow::Result<Self>
where where
@ -73,9 +74,10 @@ where
fn run(self) -> anyhow::Result<()> { fn run(self) -> anyhow::Result<()> {
crossbeam::select! { crossbeam::select! {
recv(self.recvs.stop_rx) -> _ => { recv(self.recvs.stop_rx) -> finish_tx => {
// collect the run time meta // collect the run time meta
self.sink(Arc::new(R::from(Runtime::load()?)))?; self.sink(Arc::new(R::from(Runtime::load()?)))?;
finish_tx?.send(())?;
} }
recv(self.recvs.recv) -> msg => { recv(self.recvs.recv) -> msg => {
self.sink(msg?)?; self.sink(msg?)?;