Implement concrete error

This commit is contained in:
Al Liu 2023-11-05 21:23:15 +08:00
parent 2f708060f7
commit 3cbfc64eca
15 changed files with 2004 additions and 58 deletions

1944
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@ use async_trait::async_trait;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{NoMessage, OutboundRelay};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::channel;
@ -29,11 +29,11 @@ impl ServiceData for ChatService {
#[async_trait]
impl ServiceCore for ChatService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self {
mut service_state, ..
} = self;

View File

@ -3,7 +3,7 @@ use async_trait::async_trait;
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use std::fmt::Debug;
use tokio::sync::mpsc::Sender;
@ -49,7 +49,7 @@ impl<I: NetworkBackend + Send + 'static> ServiceData for NetworkService<I> {
#[async_trait]
impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self {
implem: <I as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings(),
@ -58,7 +58,7 @@ impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> {
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
service_state,
mut implem,

View File

@ -152,17 +152,15 @@ fn generate_new_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStr
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
quote! {
#field_identifier: {
let manager =
::overwatch_rs::services::handle::ServiceHandle::<#service_type>::new(
#settings_field_identifier, overwatch_handle.clone(),
)?;
manager
::overwatch_rs::services::handle::ServiceHandle::<#service_type>::new(
#settings_field_identifier, overwatch_handle.clone(),
).map_err(::overwatch_rs::overwatch::Error::any)?
}
}
});
quote! {
fn new(settings: Self::Settings, overwatch_handle: ::overwatch_rs::overwatch::handle::OverwatchHandle) -> ::std::result::Result<Self, ::overwatch_rs::DynError> {
fn new(settings: Self::Settings, overwatch_handle: ::overwatch_rs::overwatch::handle::OverwatchHandle) -> ::std::result::Result<Self, ::overwatch_rs::overwatch::Error> {
let Self::Settings {
#( #fields_settings ),*
} = settings;
@ -187,7 +185,7 @@ fn generate_start_all_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::To
quote! {
#[::tracing::instrument(skip(self), err)]
fn start_all(&mut self) -> Result<::overwatch_rs::overwatch::ServicesLifeCycleHandle, ::overwatch_rs::overwatch::Error> {
::std::result::Result::Ok([#( #call_start ),*].try_into()?)
[#( #call_start ),*].try_into()
}
}
}

View File

@ -1,14 +1,12 @@
// std
use std::borrow::Cow;
use std::collections::HashMap;
use std::default::Default;
use std::error::Error;
// crates
use tokio::sync::broadcast::Sender;
// internal
use crate::services::life_cycle::{FinishedSignal, LifecycleHandle, LifecycleMessage};
use crate::services::ServiceId;
use crate::DynError;
use crate::services::{ServiceError, ServiceId};
// use crate::DynError;
/// Grouper handle for the `LifecycleHandle` of each spawned service.
#[derive(Clone)]
@ -34,11 +32,11 @@ impl ServicesLifeCycleHandle {
&self,
service: ServiceId,
sender: Sender<FinishedSignal>,
) -> Result<(), DynError> {
) -> Result<(), ServiceError> {
self.handlers
.get(service)
.unwrap()
.send(LifecycleMessage::Shutdown(sender))?;
.send(service, LifecycleMessage::Shutdown(sender))?;
Ok(())
}
@ -47,15 +45,15 @@ impl ServicesLifeCycleHandle {
/// # Arguments
///
/// `service` - The `ServiceId` of the target service
pub fn kill(&self, service: ServiceId) -> Result<(), DynError> {
pub fn kill(&self, service: ServiceId) -> Result<(), ServiceError> {
self.handlers
.get(service)
.unwrap()
.send(LifecycleMessage::Kill)
.send(service, LifecycleMessage::Kill)
}
/// Send a `Kill` message to all services registered in this handle
pub fn kill_all(&self) -> Result<(), DynError> {
pub fn kill_all(&self) -> Result<(), ServiceError> {
for service_id in self.services_ids() {
self.kill(service_id)?;
}
@ -70,15 +68,13 @@ impl ServicesLifeCycleHandle {
impl<const N: usize> TryFrom<[(ServiceId, LifecycleHandle); N]> for ServicesLifeCycleHandle {
// TODO: On errors refactor extract into a concrete error type with `thiserror`
type Error = Box<dyn Error + Send + Sync>;
type Error = super::Error;
fn try_from(value: [(ServiceId, LifecycleHandle); N]) -> Result<Self, Self::Error> {
let mut handlers = HashMap::new();
for (service_id, handle) in value {
if handlers.contains_key(service_id) {
return Err(Box::<dyn Error + Send + Sync>::from(Cow::Owned(format!(
"Duplicated serviceId: {service_id}"
))));
return Err(super::Error::DuplicatedServiceId { service_id });
}
handlers.insert(service_id, handle);
}

View File

@ -38,6 +38,9 @@ pub enum Error {
#[error("Service {service_id} is unavailable")]
Unavailable { service_id: ServiceId },
#[error("Service id must be unique, but find a duplicated id: {service_id}")]
DuplicatedServiceId { service_id: ServiceId },
#[error(transparent)]
Any(super::DynError),
}
@ -75,7 +78,7 @@ pub trait Services: Sized {
fn new(
settings: Self::Settings,
overwatch_handle: OverwatchHandle,
) -> std::result::Result<Self, super::DynError>;
) -> std::result::Result<Self, Error>;
/// Start a services attached to the trait implementer
fn start(&mut self, service_id: ServiceId) -> Result<(), Error>;
@ -120,7 +123,7 @@ where
pub fn run(
settings: S::Settings,
runtime: Option<Runtime>,
) -> std::result::Result<Overwatch, super::DynError> {
) -> std::result::Result<Overwatch, Error> {
let runtime = runtime.unwrap_or_else(default_multithread_runtime);
let (finish_signal_sender, finish_runner_signal) = tokio::sync::oneshot::channel();
@ -162,7 +165,7 @@ where
msg: LifecycleMessage::Shutdown(channel),
} => {
if let Err(e) = lifecycle_handlers.shutdown(service_id, channel) {
error!(e);
error!(%e);
}
}
ServiceLifeCycleCommand {
@ -170,7 +173,7 @@ where
msg: LifecycleMessage::Kill,
} => {
if let Err(e) = lifecycle_handlers.kill(service_id) {
error!(e);
error!(%e);
}
}
},
@ -180,7 +183,7 @@ where
OverwatchLifeCycleCommand::Kill | OverwatchLifeCycleCommand::Shutdown
) {
if let Err(e) = lifecycle_handlers.kill_all() {
error!(e);
error!(%e);
}
break;
}
@ -283,7 +286,7 @@ mod test {
fn new(
_settings: Self::Settings,
_overwatch_handle: OverwatchHandle,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> Result<Self, Error> {
Ok(EmptyServices)
}

View File

@ -128,7 +128,7 @@ where
/// Spawn the service main loop and handle it lifecycle
/// Return a handle to abort execution manually
pub fn run(self) -> Result<(ServiceId, LifecycleHandle), crate::DynError> {
pub fn run(self) -> Result<(ServiceId, LifecycleHandle), super::ServiceError> {
let ServiceRunner {
service_state,
state_handle,

View File

@ -1,10 +1,10 @@
use crate::DynError;
use futures::Stream;
use std::default::Default;
use std::error::Error;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio_stream::StreamExt;
use super::ServiceId;
/// Type alias for an empty signal
pub type FinishedSignal = ();
@ -58,11 +58,11 @@ impl LifecycleHandle {
}
/// Send a `LifecycleMessage` to the service
pub fn send(&self, msg: LifecycleMessage) -> Result<(), DynError> {
pub fn send(&self, id: ServiceId, msg: LifecycleMessage) -> Result<(), super::ServiceError> {
self.notifier
.send(msg)
.map(|_| ())
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync + 'static>)
.map_err(|_| super::ServiceError::NotifierClosed(id))
}
}

View File

@ -43,16 +43,22 @@ pub trait ServiceData {
#[async_trait]
pub trait ServiceCore: Sized + ServiceData {
/// Initialize the service with the given state
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, super::DynError>;
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError>;
/// Service main loop
async fn run(mut self) -> Result<(), super::DynError>;
async fn run(mut self) -> Result<(), ServiceError>;
}
#[derive(Error, Debug)]
pub enum ServiceError {
#[error(transparent)]
RelayError(#[from] RelayError),
#[error("{0}'s notifier closed")]
NotifierClosed(ServiceId),
#[error(transparent)]
Service(Box<dyn std::error::Error + Send + Sync + 'static>),
}
pub enum ServiceRuntime {

View File

@ -87,7 +87,7 @@ impl<T> Clone for NoState<T> {
impl<Settings> ServiceState for NoState<Settings> {
type Settings = Settings;
type Error = crate::DynError;
type Error = std::convert::Infallible;
fn from_settings(_settings: &Self::Settings) -> Result<Self, Self::Error> {
Ok(Self(Default::default()))
@ -218,8 +218,8 @@ mod test {
impl ServiceState for UsizeCounter {
type Settings = ();
type Error = crate::DynError;
fn from_settings(_settings: &Self::Settings) -> Result<Self, crate::DynError> {
type Error = std::convert::Infallible;
fn from_settings(_settings: &Self::Settings) -> Result<Self, Self::Error> {
Ok(Self(0))
}
}

View File

@ -5,8 +5,7 @@ use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::NoMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use std::time::Duration;
use tokio::time::sleep;
use tokio_stream::StreamExt;
@ -25,11 +24,11 @@ impl ServiceData for CancellableService {
#[async_trait::async_trait]
impl ServiceCore for CancellableService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
async fn run(self) -> Result<(), ServiceError> {
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
let mut interval = tokio::time::interval(Duration::from_millis(200));
loop {

View File

@ -5,7 +5,7 @@ use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use std::fmt::Debug;
use std::time::Duration;
use tokio::time::sleep;
@ -39,14 +39,14 @@ impl<T: Send> ServiceCore for GenericService<T>
where
T: Debug + 'static + Sync,
{
fn init(state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self {
state,
_phantom: std::marker::PhantomData,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
use tokio::io::{self, AsyncWriteExt};
let Self {

View File

@ -5,7 +5,7 @@ use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use std::time::Duration;
use tokio::time::sleep;
@ -28,11 +28,11 @@ impl ServiceData for PrintService {
#[async_trait]
impl ServiceCore for PrintService {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self { state })
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
use tokio::io::{self, AsyncWriteExt};
let Self {

View File

@ -4,7 +4,7 @@ use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use std::time::Duration;
use tokio::time::sleep;
@ -29,11 +29,11 @@ impl ServiceData for SettingsService {
#[async_trait]
impl ServiceCore for SettingsService {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self { state })
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
state: ServiceStateHandle {
settings_reader, ..

View File

@ -4,7 +4,7 @@ use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{ServiceState, StateOperator};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use std::time::Duration;
use tokio::io::{self, AsyncWriteExt};
use tokio::time::sleep;
@ -75,11 +75,11 @@ impl ServiceData for UpdateStateService {
#[async_trait]
impl ServiceCore for UpdateStateService {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self { state })
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
state: ServiceStateHandle { state_updater, .. },
} = self;