Plug services lifecycle (Shutdown and Kill) (#27)

* Plug lifecycle channel

* Pipe lifecycle to services.

* Add explaining doc on handle clone

* Fix tests

* Add missing break on overwatch kill

* Added shutdown service test

* Clippy happy

* Use try_from instead of from for checking duplicated service ids on ServicesLifeCycleHandle

* Added docs
This commit is contained in:
Daniel Sanchez 2023-10-09 14:00:12 +02:00 committed by GitHub
parent 6e6678b0e4
commit ac28d01158
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 314 additions and 47 deletions

View File

@ -1,5 +1,5 @@
[workspace] [workspace]
resolver = "2"
members = [ members = [
"overwatch-rs", "overwatch-rs",
"overwatch-derive", "overwatch-derive",

View File

@ -180,16 +180,14 @@ fn generate_start_all_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::To
let call_start = fields.iter().map(|field| { let call_start = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
quote! { quote! {
self.#field_identifier.service_runner().run()?; self.#field_identifier.service_runner().run()?
} }
}); });
quote! { quote! {
#[::tracing::instrument(skip(self), err)] #[::tracing::instrument(skip(self), err)]
fn start_all(&mut self) -> Result<(), ::overwatch_rs::overwatch::Error> { fn start_all(&mut self) -> Result<::overwatch_rs::overwatch::ServicesLifeCycleHandle, ::overwatch_rs::overwatch::Error> {
#( #call_start )* ::std::result::Result::Ok([#( #call_start ),*].try_into()?)
::std::result::Result::Ok(())
} }
} }
} }

View File

@ -29,7 +29,7 @@ color-eyre = "0.6"
async-trait = "0.1" async-trait = "0.1"
futures = "0.3" futures = "0.3"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] } tokio = { version = "1.32", features = ["rt-multi-thread", "sync", "time"] }
tokio-stream = {version ="0.1", features = ["sync"] } tokio-stream = {version ="0.1", features = ["sync"] }
tokio-util = "0.7" tokio-util = "0.7"
tracing = "0.1" tracing = "0.1"

View File

@ -2,6 +2,7 @@
// crates // crates
use crate::overwatch::AnySettings; use crate::overwatch::AnySettings;
use crate::services::life_cycle::LifecycleMessage;
use tokio::sync::oneshot; use tokio::sync::oneshot;
// internal // internal
@ -33,18 +34,9 @@ pub struct RelayCommand {
/// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle /// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle
#[allow(unused)] #[allow(unused)]
#[derive(Debug)] #[derive(Debug)]
pub struct ServiceLifeCycle<R> { pub struct ServiceLifeCycleCommand {
service_id: ServiceId, pub service_id: ServiceId,
reply_channel: ReplyChannel<R>, pub msg: LifecycleMessage,
}
/// [`ServiceCore`](crate::services::ServiceCore) lifecycle related commands
#[derive(Debug)]
pub enum ServiceLifeCycleCommand {
Shutdown(ServiceLifeCycle<()>),
Kill(ServiceLifeCycle<()>),
Start(ServiceLifeCycle<()>),
Stop(ServiceLifeCycle<()>),
} }
/// [`Overwatch`](crate::overwatch::Overwatch) lifecycle related commands /// [`Overwatch`](crate::overwatch::Overwatch) lifecycle related commands

View File

@ -0,0 +1,87 @@
// std
use std::borrow::Cow;
use std::collections::HashMap;
use std::default::Default;
use std::error::Error;
// crates
use tokio::sync::broadcast::Sender;
// internal
use crate::services::life_cycle::{FinishedSignal, LifecycleHandle, LifecycleMessage};
use crate::services::ServiceId;
use crate::DynError;
/// Grouper handle for the `LifecycleHandle` of each spawned service.
#[derive(Clone)]
pub struct ServicesLifeCycleHandle {
handlers: HashMap<ServiceId, LifecycleHandle>,
}
impl ServicesLifeCycleHandle {
pub fn empty() -> Self {
Self {
handlers: Default::default(),
}
}
/// Send a `Shutdown` message to the specified service
///
/// # Arguments
///
/// `service` - The `ServiceId` of the target service
/// `sender` - A sender side of a broadcast channel. A return signal when finished handling the
/// message will be sent.
pub fn shutdown(
&self,
service: ServiceId,
sender: Sender<FinishedSignal>,
) -> Result<(), DynError> {
self.handlers
.get(service)
.unwrap()
.send(LifecycleMessage::Shutdown(sender))?;
Ok(())
}
/// Send a `Kill` message to the specified service (`ServiceId`)
///
/// # Arguments
///
/// `service` - The `ServiceId` of the target service
pub fn kill(&self, service: ServiceId) -> Result<(), DynError> {
self.handlers
.get(service)
.unwrap()
.send(LifecycleMessage::Kill)
}
/// Send a `Kill` message to all services registered in this handle
pub fn kill_all(&self) -> Result<(), DynError> {
for service_id in self.services_ids() {
self.kill(service_id)?;
}
Ok(())
}
/// Get all services ids registered in this handle
pub fn services_ids(&self) -> impl Iterator<Item = ServiceId> + '_ {
self.handlers.keys().copied()
}
}
impl<const N: usize> TryFrom<[(ServiceId, LifecycleHandle); N]> for ServicesLifeCycleHandle {
// TODO: On errors refactor extract into a concrete error type with `thiserror`
type Error = Box<dyn Error + Send + Sync>;
fn try_from(value: [(ServiceId, LifecycleHandle); N]) -> Result<Self, Self::Error> {
let mut handlers = HashMap::new();
for (service_id, handle) in value {
if handlers.contains_key(service_id) {
return Err(Box::<dyn Error + Send + Sync>::from(Cow::Owned(format!(
"Duplicated serviceId: {service_id}"
))));
}
handlers.insert(service_id, handle);
}
Ok(Self { handlers })
}
}

View File

@ -1,5 +1,6 @@
pub mod commands; pub mod commands;
pub mod handle; pub mod handle;
pub mod life_cycle;
// std // std
use std::any::Any; use std::any::Any;
@ -14,14 +15,16 @@ use tokio::runtime::{Handle, Runtime};
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tracing::{info, instrument}; use tracing::{error, info, instrument};
// internal // internal
use crate::overwatch::commands::{ use crate::overwatch::commands::{
OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, SettingsCommand, OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, ServiceLifeCycleCommand,
SettingsCommand,
}; };
use crate::overwatch::handle::OverwatchHandle; use crate::overwatch::handle::OverwatchHandle;
pub use crate::overwatch::life_cycle::ServicesLifeCycleHandle;
use crate::services::life_cycle::LifecycleMessage;
use crate::services::relay::RelayResult; use crate::services::relay::RelayResult;
use crate::services::{ServiceError, ServiceId}; use crate::services::{ServiceError, ServiceId};
use crate::utils::runtime::default_multithread_runtime; use crate::utils::runtime::default_multithread_runtime;
@ -79,7 +82,7 @@ pub trait Services: Sized {
// TODO: this probably will be removed once the services lifecycle is implemented // TODO: this probably will be removed once the services lifecycle is implemented
/// Start all services attached to the trait implementer /// Start all services attached to the trait implementer
fn start_all(&mut self) -> Result<(), Error>; fn start_all(&mut self) -> Result<ServicesLifeCycleHandle, Error>;
/// Stop a service attached to the trait implementer /// Stop a service attached to the trait implementer
fn stop(&mut self, service_id: ServiceId) -> Result<(), Error>; fn stop(&mut self, service_id: ServiceId) -> Result<(), Error>;
@ -124,12 +127,20 @@ where
let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16); let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16);
let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender); let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender);
let services = S::new(settings, handle.clone())?; let services = S::new(settings, handle.clone())?;
let runner = OverwatchRunner { let mut runner = OverwatchRunner {
services, services,
handle: handle.clone(), handle: handle.clone(),
finish_signal_sender, finish_signal_sender,
}; };
runtime.spawn(async move { runner.run_(commands_receiver).await });
let lifecycle_handlers = runner.services.start_all()?;
runtime.spawn(async move {
runner
.run_(commands_receiver, lifecycle_handlers.clone())
.await
});
Ok(Overwatch { Ok(Overwatch {
runtime, runtime,
handle, handle,
@ -138,28 +149,48 @@ where
} }
#[instrument(name = "overwatch-run", skip_all)] #[instrument(name = "overwatch-run", skip_all)]
async fn run_(self, mut receiver: Receiver<OverwatchCommand>) { async fn run_(
self,
mut receiver: Receiver<OverwatchCommand>,
lifecycle_handlers: ServicesLifeCycleHandle,
) {
let Self { let Self {
mut services, mut services,
handle: _, handle: _,
finish_signal_sender, finish_signal_sender,
} = self; } = self;
// TODO: this probably need to be manually done, or at least handled by a flag
services.start_all().expect("Services to start running");
while let Some(command) = receiver.recv().await { while let Some(command) = receiver.recv().await {
info!(command = ?command, "Overwatch command received"); info!(command = ?command, "Overwatch command received");
match command { match command {
OverwatchCommand::Relay(relay_command) => { OverwatchCommand::Relay(relay_command) => {
Self::handle_relay(&mut services, relay_command).await; Self::handle_relay(&mut services, relay_command).await;
} }
OverwatchCommand::ServiceLifeCycle(_) => { OverwatchCommand::ServiceLifeCycle(msg) => match msg {
unimplemented!("Services life cycle is still not supported!"); ServiceLifeCycleCommand {
} service_id,
msg: LifecycleMessage::Shutdown(channel),
} => {
if let Err(e) = lifecycle_handlers.shutdown(service_id, channel) {
error!(e);
}
}
ServiceLifeCycleCommand {
service_id,
msg: LifecycleMessage::Kill,
} => {
if let Err(e) = lifecycle_handlers.kill(service_id) {
error!(e);
}
}
},
OverwatchCommand::OverwatchLifeCycle(command) => { OverwatchCommand::OverwatchLifeCycle(command) => {
if matches!( if matches!(
command, command,
OverwatchLifeCycleCommand::Kill | OverwatchLifeCycleCommand::Shutdown OverwatchLifeCycleCommand::Kill | OverwatchLifeCycleCommand::Shutdown
) { ) {
if let Err(e) = lifecycle_handlers.kill_all() {
error!(e);
}
break; break;
} }
} }
@ -216,7 +247,7 @@ impl Overwatch {
&self.handle &self.handle
} }
/// Get the underllaying tokio runtime handle /// Get the underlaying tokio runtime handle
pub fn runtime(&self) -> &Handle { pub fn runtime(&self) -> &Handle {
self.runtime.handle() self.runtime.handle()
} }
@ -247,7 +278,7 @@ impl Overwatch {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::overwatch::handle::OverwatchHandle; use crate::overwatch::handle::OverwatchHandle;
use crate::overwatch::{Error, OverwatchRunner, Services}; use crate::overwatch::{Error, OverwatchRunner, Services, ServicesLifeCycleHandle};
use crate::services::relay::{RelayError, RelayResult}; use crate::services::relay::{RelayError, RelayResult};
use crate::services::ServiceId; use crate::services::ServiceId;
use std::time::Duration; use std::time::Duration;
@ -269,8 +300,8 @@ mod test {
Err(Error::Unavailable { service_id }) Err(Error::Unavailable { service_id })
} }
fn start_all(&mut self) -> Result<(), Error> { fn start_all(&mut self) -> Result<ServicesLifeCycleHandle, Error> {
Ok(()) Ok(ServicesLifeCycleHandle::empty())
} }
fn stop(&mut self, service_id: ServiceId) -> Result<(), Error> { fn stop(&mut self, service_id: ServiceId) -> Result<(), Error> {

View File

@ -1,14 +1,14 @@
// crates // crates
use futures::future::{abortable, AbortHandle};
use tokio::runtime::Handle; use tokio::runtime::Handle;
// internal // internal
use crate::overwatch::handle::OverwatchHandle; use crate::overwatch::handle::OverwatchHandle;
use crate::services::life_cycle::LifecycleHandle;
use crate::services::relay::{relay, InboundRelay, OutboundRelay}; use crate::services::relay::{relay, InboundRelay, OutboundRelay};
use crate::services::settings::{SettingsNotifier, SettingsUpdater}; use crate::services::settings::{SettingsNotifier, SettingsUpdater};
use crate::services::state::{StateHandle, StateOperator, StateUpdater}; use crate::services::state::{StateHandle, StateOperator, StateUpdater};
use crate::services::{ServiceCore, ServiceData, ServiceId, ServiceState}; use crate::services::{ServiceCore, ServiceData, ServiceId, ServiceState};
// TODO: Abstract handle over state, to diferentiate when the service is running and when it is not // TODO: Abstract handle over state, to differentiate when the service is running and when it is not
// that way we can expose a better API depending on what is happenning. Would get rid of the probably // that way we can expose a better API depending on what is happenning. Would get rid of the probably
// unnecessary Option and cloning. // unnecessary Option and cloning.
/// Service handle /// Service handle
@ -33,7 +33,7 @@ pub struct ServiceStateHandle<S: ServiceData> {
pub overwatch_handle: OverwatchHandle, pub overwatch_handle: OverwatchHandle,
pub settings_reader: SettingsNotifier<S::Settings>, pub settings_reader: SettingsNotifier<S::Settings>,
pub state_updater: StateUpdater<S::State>, pub state_updater: StateUpdater<S::State>,
pub _lifecycle_handler: (), pub lifecycle_handle: LifecycleHandle,
} }
/// Main service executor /// Main service executor
@ -41,6 +41,7 @@ pub struct ServiceStateHandle<S: ServiceData> {
pub struct ServiceRunner<S: ServiceData> { pub struct ServiceRunner<S: ServiceData> {
service_state: ServiceStateHandle<S>, service_state: ServiceStateHandle<S>,
state_handle: StateHandle<S::State, S::StateOperator>, state_handle: StateHandle<S::State, S::StateOperator>,
lifecycle_handle: LifecycleHandle,
} }
impl<S: ServiceData> ServiceHandle<S> { impl<S: ServiceData> ServiceHandle<S> {
@ -94,17 +95,20 @@ impl<S: ServiceData> ServiceHandle<S> {
let (state_handle, state_updater) = let (state_handle, state_updater) =
StateHandle::<S::State, S::StateOperator>::new(self.initial_state.clone(), operator); StateHandle::<S::State, S::StateOperator>::new(self.initial_state.clone(), operator);
let lifecycle_handle = LifecycleHandle::new();
let service_state = ServiceStateHandle { let service_state = ServiceStateHandle {
inbound_relay, inbound_relay,
overwatch_handle: self.overwatch_handle.clone(), overwatch_handle: self.overwatch_handle.clone(),
state_updater, state_updater,
settings_reader, settings_reader,
_lifecycle_handler: (), lifecycle_handle: lifecycle_handle.clone(),
}; };
ServiceRunner { ServiceRunner {
service_state, service_state,
state_handle, state_handle,
lifecycle_handle,
} }
} }
} }
@ -124,22 +128,19 @@ where
/// Spawn the service main loop and handle it lifecycle /// Spawn the service main loop and handle it lifecycle
/// Return a handle to abort execution manually /// Return a handle to abort execution manually
pub fn run(self) -> Result<AbortHandle, crate::DynError> { pub fn run(self) -> Result<(ServiceId, LifecycleHandle), crate::DynError> {
let ServiceRunner { let ServiceRunner {
service_state, service_state,
state_handle, state_handle,
.. lifecycle_handle,
} = self; } = self;
let runtime = service_state.overwatch_handle.runtime().clone(); let runtime = service_state.overwatch_handle.runtime().clone();
let service = S::init(service_state)?; let service = S::init(service_state)?;
let (runner, abortable_handle) = abortable(service.run());
runtime.spawn(runner); runtime.spawn(service.run());
runtime.spawn(state_handle.run()); runtime.spawn(state_handle.run());
// TODO: Handle service lifecycle Ok((S::SERVICE_ID, lifecycle_handle))
// TODO: this handle should not scape this scope, it should actually be handled in the lifecycle part mentioned above
Ok(abortable_handle)
} }
} }

View File

@ -1 +1,73 @@
use crate::DynError;
use futures::Stream;
use std::default::Default;
use std::error::Error;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio_stream::StreamExt;
/// Type alias for an empty signal
pub type FinishedSignal = ();
/// Supported lifecycle messages
#[derive(Clone, Debug)]
pub enum LifecycleMessage {
/// Shutdown
/// Hold a sender from a broadcast channel. It is intended to signal when finished handling the
/// shutdown process.
Shutdown(Sender<FinishedSignal>),
/// Kill
/// Well, nothing much to explain here, everything should be about to be nuked.
Kill,
}
/// Handle for lifecycle communications with a `Service`
pub struct LifecycleHandle {
message_channel: Receiver<LifecycleMessage>,
notifier: Sender<LifecycleMessage>,
}
impl Clone for LifecycleHandle {
fn clone(&self) -> Self {
Self {
// `resubscribe` gives us access just to newly produced event not already enqueued ones
// that is fine, as at any point missing signals means you were not interested in the moment
// it was produced and most probably whatever holding the handle was not even alive.
message_channel: self.message_channel.resubscribe(),
notifier: self.notifier.clone(),
}
}
}
impl LifecycleHandle {
pub fn new() -> Self {
// Use a single lifecycle message at a time. Idea is that all computations on lifecycle should
// stack so waiting es effective even if later on is somehow reversed (for example for start/stop events).
let (notifier, message_channel) = channel(1);
Self {
notifier,
message_channel,
}
}
/// Incoming lifecycle message stream
/// Notice that messages are not buffered. So, different calls to this method could yield different
/// incoming messages depending the timing of call.
pub fn message_stream(&self) -> impl Stream<Item = LifecycleMessage> {
tokio_stream::wrappers::BroadcastStream::new(self.message_channel.resubscribe())
.filter_map(Result::ok)
}
/// Send a `LifecycleMessage` to the service
pub fn send(&self, msg: LifecycleMessage) -> Result<(), DynError> {
self.notifier
.send(msg)
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync + 'static>)
}
}
impl Default for LifecycleHandle {
fn default() -> Self {
Self::new()
}
}

View File

@ -189,7 +189,7 @@ impl<S: ServiceData> Relay<S> {
Ok(Ok(message)) => match message.downcast::<OutboundRelay<S::Message>>() { Ok(Ok(message)) => match message.downcast::<OutboundRelay<S::Message>>() {
Ok(channel) => Ok(*channel), Ok(channel) => Ok(*channel),
Err(m) => Err(RelayError::InvalidMessage { Err(m) => Err(RelayError::InvalidMessage {
type_id: format!("{:?}", m.type_id()), type_id: format!("{:?}", (*m).type_id()),
service_id: S::SERVICE_ID, service_id: S::SERVICE_ID,
}), }),
}, },

View File

@ -0,0 +1,86 @@
use overwatch_derive::Services;
use overwatch_rs::overwatch::commands::{OverwatchCommand, ServiceLifeCycleCommand};
use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::NoMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use std::time::Duration;
use tokio::time::sleep;
use tokio_stream::StreamExt;
pub struct CancellableService {
service_state: ServiceStateHandle<Self>,
}
impl ServiceData for CancellableService {
const SERVICE_ID: ServiceId = "cancel-me-please";
type Settings = ();
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait::async_trait]
impl ServiceCore for CancellableService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
let mut interval = tokio::time::interval(Duration::from_millis(200));
loop {
tokio::select! {
msg = lifecycle_stream.next() => {
match msg {
Some(LifecycleMessage::Shutdown(reply)) => {
reply.send(()).unwrap();
break;
}
Some(LifecycleMessage::Kill) => {
break;
}
_ => {
unimplemented!();
}
}
}
_ = interval.tick() => {
println!("Waiting to be killed 💀");
}
}
}
Ok(())
}
}
#[derive(Services)]
struct CancelableServices {
cancelable: ServiceHandle<CancellableService>,
}
#[test]
fn run_overwatch_then_shutdown_service_and_kill() {
let settings = CancelableServicesServiceSettings { cancelable: () };
let overwatch = OverwatchRunner::<CancelableServices>::run(settings, None).unwrap();
let handle = overwatch.handle().clone();
let (sender, mut receiver) = tokio::sync::broadcast::channel(1);
overwatch.spawn(async move {
sleep(Duration::from_millis(500)).await;
handle
.send(OverwatchCommand::ServiceLifeCycle(
ServiceLifeCycleCommand {
service_id: <CancellableService as ServiceData>::SERVICE_ID,
msg: LifecycleMessage::Shutdown(sender),
},
))
.await;
// wait service finished
receiver.recv().await.unwrap();
handle.kill().await;
});
overwatch.wait_finished();
}