Remove multiple runtimes support from API (#5)
This commit is contained in:
parent
7124639b60
commit
bc771e21ca
|
@ -142,32 +142,26 @@ fn generate_new_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStr
|
||||||
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
|
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
|
||||||
quote! {
|
quote! {
|
||||||
#field_identifier: {
|
#field_identifier: {
|
||||||
let (runtime, manager) =
|
let manager =
|
||||||
::overwatch::services::handle::ServiceHandle::<#service_type>::new(
|
::overwatch::services::handle::ServiceHandle::<#service_type>::new(
|
||||||
#settings_field_identifier, overwatch_handle.clone(),
|
#settings_field_identifier, overwatch_handle.clone(),
|
||||||
);
|
);
|
||||||
runtimes.insert(<#service_type as ::overwatch::services::ServiceData>::SERVICE_ID, runtime);
|
|
||||||
manager
|
manager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
quote! {
|
quote! {
|
||||||
fn new(settings: Self::Settings, overwatch_handle: ::overwatch::overwatch::handle::OverwatchHandle) -> (
|
fn new(settings: Self::Settings, overwatch_handle: ::overwatch::overwatch::handle::OverwatchHandle) -> Self {
|
||||||
std::collections::HashMap<::overwatch::services::ServiceId, ::std::option::Option<::tokio::runtime::Runtime>>,
|
|
||||||
Self
|
|
||||||
) {
|
|
||||||
let Self::Settings {
|
let Self::Settings {
|
||||||
#( #fields_settings ),*
|
#( #fields_settings ),*
|
||||||
} = settings;
|
} = settings;
|
||||||
|
|
||||||
let mut runtimes = ::std::collections::HashMap::new();
|
|
||||||
|
|
||||||
let app = Self {
|
let app = Self {
|
||||||
#( #managers ),*
|
#( #managers ),*
|
||||||
};
|
};
|
||||||
|
|
||||||
(runtimes, app)
|
app
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ pub mod handle;
|
||||||
// std
|
// std
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
|
||||||
|
@ -55,10 +54,7 @@ pub trait Services: Send + Sync {
|
||||||
/// It returns a `(ServiceId, Runtime)` where Runtime is the `tokio::runtime::Runtime` attached for each
|
/// It returns a `(ServiceId, Runtime)` where Runtime is the `tokio::runtime::Runtime` attached for each
|
||||||
/// service.
|
/// service.
|
||||||
/// It also returns an instance of the implementing type.
|
/// It also returns an instance of the implementing type.
|
||||||
fn new(
|
fn new(settings: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
|
||||||
settings: Self::Settings,
|
|
||||||
overwatch_handle: OverwatchHandle,
|
|
||||||
) -> (HashMap<ServiceId, Option<Runtime>>, Self);
|
|
||||||
|
|
||||||
/// Start a services attached to the trait implementer
|
/// Start a services attached to the trait implementer
|
||||||
fn start(&mut self, service_id: ServiceId) -> Result<(), Error>;
|
fn start(&mut self, service_id: ServiceId) -> Result<(), Error>;
|
||||||
|
@ -106,7 +102,7 @@ where
|
||||||
let (finish_signal_sender, finish_runner_signal) = tokio::sync::oneshot::channel();
|
let (finish_signal_sender, finish_runner_signal) = tokio::sync::oneshot::channel();
|
||||||
let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16);
|
let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16);
|
||||||
let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender);
|
let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender);
|
||||||
let (services_runtimes, services) = S::new(settings, handle.clone());
|
let services = S::new(settings, handle.clone());
|
||||||
let runner = OverwatchRunner {
|
let runner = OverwatchRunner {
|
||||||
services,
|
services,
|
||||||
handle: handle.clone(),
|
handle: handle.clone(),
|
||||||
|
@ -115,7 +111,6 @@ where
|
||||||
runtime.spawn(async move { runner.run_(commands_receiver).await });
|
runtime.spawn(async move { runner.run_(commands_receiver).await });
|
||||||
Overwatch {
|
Overwatch {
|
||||||
runtime,
|
runtime,
|
||||||
services_runtimes,
|
|
||||||
handle,
|
handle,
|
||||||
finish_runner_signal,
|
finish_runner_signal,
|
||||||
}
|
}
|
||||||
|
@ -189,8 +184,6 @@ where
|
||||||
/// It manages the overwatch runtime and handle
|
/// It manages the overwatch runtime and handle
|
||||||
pub struct Overwatch {
|
pub struct Overwatch {
|
||||||
runtime: Runtime,
|
runtime: Runtime,
|
||||||
#[allow(unused)]
|
|
||||||
services_runtimes: HashMap<ServiceId, Option<Runtime>>,
|
|
||||||
handle: OverwatchHandle,
|
handle: OverwatchHandle,
|
||||||
finish_runner_signal: oneshot::Receiver<FinishOverwatchSignal>,
|
finish_runner_signal: oneshot::Receiver<FinishOverwatchSignal>,
|
||||||
}
|
}
|
||||||
|
@ -236,9 +229,7 @@ mod test {
|
||||||
use crate::overwatch::{Error, OverwatchRunner, Services};
|
use crate::overwatch::{Error, OverwatchRunner, Services};
|
||||||
use crate::services::relay::{RelayError, RelayResult};
|
use crate::services::relay::{RelayError, RelayResult};
|
||||||
use crate::services::ServiceId;
|
use crate::services::ServiceId;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::runtime::Runtime;
|
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
struct EmptyServices;
|
struct EmptyServices;
|
||||||
|
@ -246,11 +237,8 @@ mod test {
|
||||||
impl Services for EmptyServices {
|
impl Services for EmptyServices {
|
||||||
type Settings = ();
|
type Settings = ();
|
||||||
|
|
||||||
fn new(
|
fn new(_settings: Self::Settings, _overwatch_handle: OverwatchHandle) -> Self {
|
||||||
_settings: Self::Settings,
|
EmptyServices
|
||||||
_overwatch_handle: OverwatchHandle,
|
|
||||||
) -> (HashMap<ServiceId, Option<Runtime>>, Self) {
|
|
||||||
(HashMap::new(), EmptyServices)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(&mut self, service_id: ServiceId) -> Result<(), Error> {
|
fn start(&mut self, service_id: ServiceId) -> Result<(), Error> {
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
// crates
|
// crates
|
||||||
use futures::future::{abortable, AbortHandle};
|
use futures::future::{abortable, AbortHandle};
|
||||||
use tokio::runtime::{Handle, Runtime};
|
use tokio::runtime::Handle;
|
||||||
use tracing::instrument;
|
use tracing::instrument;
|
||||||
// internal
|
// internal
|
||||||
use crate::overwatch::handle::OverwatchHandle;
|
use crate::overwatch::handle::OverwatchHandle;
|
||||||
|
@ -19,8 +19,6 @@ use crate::services::{ServiceCore, ServiceId, ServiceState};
|
||||||
pub struct ServiceHandle<S: ServiceCore> {
|
pub struct ServiceHandle<S: ServiceCore> {
|
||||||
/// Service id (must match `<ServiceCore::ServiceId>`)
|
/// Service id (must match `<ServiceCore::ServiceId>`)
|
||||||
id: ServiceId,
|
id: ServiceId,
|
||||||
/// Service runtime handle
|
|
||||||
runtime: Handle,
|
|
||||||
/// Message channel relay
|
/// Message channel relay
|
||||||
/// Would be None if service is not running
|
/// Would be None if service is not running
|
||||||
/// Will contain the channel if service is running
|
/// Will contain the channel if service is running
|
||||||
|
@ -35,8 +33,6 @@ pub struct ServiceHandle<S: ServiceCore> {
|
||||||
/// Service core resources
|
/// Service core resources
|
||||||
/// It contains whatever is necessary to start a new service runner
|
/// It contains whatever is necessary to start a new service runner
|
||||||
pub struct ServiceStateHandle<S: ServiceCore> {
|
pub struct ServiceStateHandle<S: ServiceCore> {
|
||||||
/// Service runtime handler
|
|
||||||
pub runtime: Handle,
|
|
||||||
/// Relay channel to communicate with the service runner
|
/// Relay channel to communicate with the service runner
|
||||||
pub inbound_relay: InboundRelay<S::Message>,
|
pub inbound_relay: InboundRelay<S::Message>,
|
||||||
/// Overwatch handle
|
/// Overwatch handle
|
||||||
|
@ -49,37 +45,25 @@ pub struct ServiceStateHandle<S: ServiceCore> {
|
||||||
/// Main service executor
|
/// Main service executor
|
||||||
/// It is the object that hold the necessary information for the service to run
|
/// It is the object that hold the necessary information for the service to run
|
||||||
pub struct ServiceRunner<S: ServiceCore> {
|
pub struct ServiceRunner<S: ServiceCore> {
|
||||||
#[allow(unused)]
|
|
||||||
overwatch_handle: OverwatchHandle,
|
|
||||||
service_state: ServiceStateHandle<S>,
|
service_state: ServiceStateHandle<S>,
|
||||||
state_handle: StateHandle<S::State, S::StateOperator>,
|
state_handle: StateHandle<S::State, S::StateOperator>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: ServiceCore> ServiceHandle<S> {
|
impl<S: ServiceCore> ServiceHandle<S> {
|
||||||
pub fn new(
|
pub fn new(settings: S::Settings, overwatch_handle: OverwatchHandle) -> Self {
|
||||||
settings: S::Settings,
|
|
||||||
overwatch_handle: OverwatchHandle,
|
|
||||||
) -> (Option<Runtime>, Self) {
|
|
||||||
let id = S::SERVICE_ID;
|
let id = S::SERVICE_ID;
|
||||||
let runtime = S::service_runtime(&settings, overwatch_handle.runtime());
|
|
||||||
|
|
||||||
let initial_state: S::State = S::State::from_settings(&settings);
|
let initial_state: S::State = S::State::from_settings(&settings);
|
||||||
|
|
||||||
let handle = runtime.handle();
|
|
||||||
let settings = SettingsUpdater::new(settings);
|
let settings = SettingsUpdater::new(settings);
|
||||||
|
|
||||||
(
|
Self {
|
||||||
runtime.runtime(),
|
id,
|
||||||
Self {
|
outbound_relay: None,
|
||||||
id,
|
settings,
|
||||||
runtime: handle,
|
initial_state,
|
||||||
outbound_relay: None,
|
overwatch_handle,
|
||||||
settings,
|
_marker: PhantomData::default(),
|
||||||
initial_state,
|
}
|
||||||
overwatch_handle,
|
|
||||||
_marker: PhantomData::default(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn id(&self) -> ServiceId {
|
pub fn id(&self) -> ServiceId {
|
||||||
|
@ -89,7 +73,7 @@ impl<S: ServiceCore> ServiceHandle<S> {
|
||||||
/// Service runtime getter
|
/// Service runtime getter
|
||||||
/// it is easily cloneable and can be done on demand
|
/// it is easily cloneable and can be done on demand
|
||||||
pub fn runtime(&self) -> &Handle {
|
pub fn runtime(&self) -> &Handle {
|
||||||
&self.runtime
|
self.overwatch_handle.runtime()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Overwatch handle
|
/// Overwatch handle
|
||||||
|
@ -121,7 +105,6 @@ impl<S: ServiceCore> ServiceHandle<S> {
|
||||||
StateHandle::<S::State, S::StateOperator>::new(self.initial_state.clone(), operator);
|
StateHandle::<S::State, S::StateOperator>::new(self.initial_state.clone(), operator);
|
||||||
|
|
||||||
let service_state = ServiceStateHandle {
|
let service_state = ServiceStateHandle {
|
||||||
runtime: self.runtime.clone(),
|
|
||||||
inbound_relay,
|
inbound_relay,
|
||||||
overwatch_handle: self.overwatch_handle.clone(),
|
overwatch_handle: self.overwatch_handle.clone(),
|
||||||
state_updater,
|
state_updater,
|
||||||
|
@ -130,7 +113,6 @@ impl<S: ServiceCore> ServiceHandle<S> {
|
||||||
};
|
};
|
||||||
|
|
||||||
ServiceRunner {
|
ServiceRunner {
|
||||||
overwatch_handle: self.overwatch_handle().clone(),
|
|
||||||
service_state,
|
service_state,
|
||||||
state_handle,
|
state_handle,
|
||||||
}
|
}
|
||||||
|
@ -154,7 +136,7 @@ impl<S: ServiceCore> ServiceRunner<S> {
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
let runtime = service_state.runtime.clone();
|
let runtime = service_state.overwatch_handle.runtime().clone();
|
||||||
let service = S::init(service_state);
|
let service = S::init(service_state);
|
||||||
let (runner, abortable_handle) = abortable(service.run());
|
let (runner, abortable_handle) = abortable(service.run());
|
||||||
|
|
||||||
|
|
|
@ -45,18 +45,6 @@ pub trait ServiceCore: ServiceData + Send + Sized + 'static {
|
||||||
/// Initialize the service with the given state
|
/// Initialize the service with the given state
|
||||||
fn init(service_state: ServiceStateHandle<Self>) -> Self;
|
fn init(service_state: ServiceStateHandle<Self>) -> Self;
|
||||||
|
|
||||||
/// Service default runtime
|
|
||||||
fn service_runtime(_settings: &Self::Settings, _parent: &runtime::Handle) -> ServiceRuntime {
|
|
||||||
let id = Self::SERVICE_ID;
|
|
||||||
ServiceRuntime::Custom(
|
|
||||||
runtime::Builder::new_multi_thread()
|
|
||||||
.enable_all()
|
|
||||||
.thread_name(id)
|
|
||||||
.build()
|
|
||||||
.unwrap_or_else(|_| panic!("Async runtime for service {id} didn't build properly")),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Service main loop
|
/// Service main loop
|
||||||
async fn run(mut self);
|
async fn run(mut self);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue