mirror of
https://github.com/logos-co/Overwatch.git
synced 2025-01-18 10:31:35 +00:00
Immutable connect (#10)
* chore: remove unused PhantomData * &mut self to &self for some methods
This commit is contained in:
parent
32e6e59377
commit
4e2978e643
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
**target/
|
@ -34,7 +34,7 @@ impl OverwatchHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Send a shutdown signal to the overwatch runner
|
/// Send a shutdown signal to the overwatch runner
|
||||||
pub async fn shutdown(&mut self) {
|
pub async fn shutdown(&self) {
|
||||||
info!("Shutting down Overwatch");
|
info!("Shutting down Overwatch");
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.sender
|
.sender
|
||||||
@ -48,7 +48,7 @@ impl OverwatchHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Send a kill signal to the overwatch runner
|
/// Send a kill signal to the overwatch runner
|
||||||
pub async fn kill(&mut self) {
|
pub async fn kill(&self) {
|
||||||
info!("Killing Overwatch");
|
info!("Killing Overwatch");
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.sender
|
.sender
|
||||||
@ -63,14 +63,14 @@ impl OverwatchHandle {
|
|||||||
|
|
||||||
/// Send an overwatch command to the overwatch runner
|
/// Send an overwatch command to the overwatch runner
|
||||||
#[instrument(name = "overwatch-command-send", skip(self))]
|
#[instrument(name = "overwatch-command-send", skip(self))]
|
||||||
pub async fn send(&mut self, command: OverwatchCommand) {
|
pub async fn send(&self, command: OverwatchCommand) {
|
||||||
if let Err(e) = self.sender.send(command).await {
|
if let Err(e) = self.sender.send(command).await {
|
||||||
error!(error=?e, "Error sending overwatch command");
|
error!(error=?e, "Error sending overwatch command");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self))]
|
#[instrument(skip(self))]
|
||||||
pub async fn update_settings<S: Services>(&mut self, settings: S::Settings) {
|
pub async fn update_settings<S: Services>(&self, settings: S::Settings) {
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.sender
|
.sender
|
||||||
.send(OverwatchCommand::Settings(SettingsCommand(Box::new(
|
.send(OverwatchCommand::Settings(SettingsCommand(Box::new(
|
||||||
|
@ -265,7 +265,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn run_overwatch_then_stop() {
|
fn run_overwatch_then_stop() {
|
||||||
let overwatch = OverwatchRunner::<EmptyServices>::run((), None);
|
let overwatch = OverwatchRunner::<EmptyServices>::run((), None);
|
||||||
let mut handle = overwatch.handle().clone();
|
let handle = overwatch.handle().clone();
|
||||||
|
|
||||||
overwatch.spawn(async move {
|
overwatch.spawn(async move {
|
||||||
sleep(Duration::from_millis(500)).await;
|
sleep(Duration::from_millis(500)).await;
|
||||||
@ -278,7 +278,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn run_overwatch_then_kill() {
|
fn run_overwatch_then_kill() {
|
||||||
let overwatch = OverwatchRunner::<EmptyServices>::run((), None);
|
let overwatch = OverwatchRunner::<EmptyServices>::run((), None);
|
||||||
let mut handle = overwatch.handle().clone();
|
let handle = overwatch.handle().clone();
|
||||||
|
|
||||||
overwatch.spawn(async move {
|
overwatch.spawn(async move {
|
||||||
sleep(Duration::from_millis(500)).await;
|
sleep(Duration::from_millis(500)).await;
|
||||||
|
@ -1,5 +1,3 @@
|
|||||||
// std
|
|
||||||
use std::marker::PhantomData;
|
|
||||||
// crates
|
// crates
|
||||||
use futures::future::{abortable, AbortHandle};
|
use futures::future::{abortable, AbortHandle};
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
@ -25,7 +23,6 @@ pub struct ServiceHandle<S: ServiceCore> {
|
|||||||
overwatch_handle: OverwatchHandle,
|
overwatch_handle: OverwatchHandle,
|
||||||
settings: SettingsUpdater<S::Settings>,
|
settings: SettingsUpdater<S::Settings>,
|
||||||
initial_state: S::State,
|
initial_state: S::State,
|
||||||
_marker: PhantomData<S>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Service core resources
|
/// Service core resources
|
||||||
@ -58,7 +55,6 @@ impl<S: ServiceCore> ServiceHandle<S> {
|
|||||||
settings,
|
settings,
|
||||||
initial_state,
|
initial_state,
|
||||||
overwatch_handle,
|
overwatch_handle,
|
||||||
_marker: PhantomData::default(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,13 +141,13 @@ impl<S: ServiceCore> Relay<S> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self), err(Debug))]
|
#[instrument(skip(self), err(Debug))]
|
||||||
pub async fn connect(&mut self) -> Result<OutboundRelay<S::Message>, RelayError> {
|
pub async fn connect(&self) -> Result<OutboundRelay<S::Message>, RelayError> {
|
||||||
let (reply, receiver) = oneshot::channel();
|
let (reply, receiver) = oneshot::channel();
|
||||||
self.request_relay(reply).await;
|
self.request_relay(reply).await;
|
||||||
self.handle_relay_response(receiver).await
|
self.handle_relay_response(receiver).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn request_relay(&mut self, reply: oneshot::Sender<RelayResult>) {
|
async fn request_relay(&self, reply: oneshot::Sender<RelayResult>) {
|
||||||
let relay_command = OverwatchCommand::Relay(RelayCommand {
|
let relay_command = OverwatchCommand::Relay(RelayCommand {
|
||||||
service_id: S::SERVICE_ID,
|
service_id: S::SERVICE_ID,
|
||||||
reply_channel: ReplyChannel(reply),
|
reply_channel: ReplyChannel(reply),
|
||||||
@ -157,7 +157,7 @@ impl<S: ServiceCore> Relay<S> {
|
|||||||
|
|
||||||
#[instrument(skip_all, err(Debug))]
|
#[instrument(skip_all, err(Debug))]
|
||||||
async fn handle_relay_response(
|
async fn handle_relay_response(
|
||||||
&mut self,
|
&self,
|
||||||
receiver: oneshot::Receiver<RelayResult>,
|
receiver: oneshot::Receiver<RelayResult>,
|
||||||
) -> Result<OutboundRelay<S::Message>, RelayError> {
|
) -> Result<OutboundRelay<S::Message>, RelayError> {
|
||||||
let response = receiver.await;
|
let response = receiver.await;
|
||||||
|
@ -22,7 +22,7 @@ impl<S: Clone> SettingsNotifier<S> {
|
|||||||
// of the method. Another option would be to spawn a task that updates a settings local value
|
// of the method. Another option would be to spawn a task that updates a settings local value
|
||||||
// each time an updated settings is received. This could not be so easy to do, since it will
|
// each time an updated settings is received. This could not be so easy to do, since it will
|
||||||
// need to hold a &mut to the holder (or needed to use a Cell/RefCell).
|
// need to hold a &mut to the holder (or needed to use a Cell/RefCell).
|
||||||
pub fn get_updated_settings(&mut self) -> S {
|
pub fn get_updated_settings(&self) -> S {
|
||||||
self.notifier_channel.borrow().clone()
|
self.notifier_channel.borrow().clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -67,7 +67,7 @@ mod test {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn settings_updater_sequence() {
|
async fn settings_updater_sequence() {
|
||||||
let updater = SettingsUpdater::new(10usize);
|
let updater = SettingsUpdater::new(10usize);
|
||||||
let mut notifier = updater.notifier();
|
let notifier = updater.notifier();
|
||||||
let values = [10, 0usize];
|
let values = [10, 0usize];
|
||||||
let mut seq = HashSet::from(values);
|
let mut seq = HashSet::from(values);
|
||||||
let handle = tokio::spawn(timeout(Duration::from_secs(3), async move {
|
let handle = tokio::spawn(timeout(Duration::from_secs(3), async move {
|
||||||
|
@ -127,7 +127,7 @@ impl<T> Clone for StateWatcher<T> {
|
|||||||
|
|
||||||
impl<S: ServiceState> StateUpdater<S> {
|
impl<S: ServiceState> StateUpdater<S> {
|
||||||
/// Send a new state and notify the [`StateWatcher`]
|
/// Send a new state and notify the [`StateWatcher`]
|
||||||
pub fn update(&mut self, new_state: S) {
|
pub fn update(&self, new_state: S) {
|
||||||
self.sender.send(new_state).unwrap_or_else(|_e| {
|
self.sender.send(new_state).unwrap_or_else(|_e| {
|
||||||
error!("Error updating state");
|
error!("Error updating state");
|
||||||
});
|
});
|
||||||
@ -227,7 +227,7 @@ mod test {
|
|||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[should_panic]
|
#[should_panic]
|
||||||
async fn state_stream_collects() {
|
async fn state_stream_collects() {
|
||||||
let (handle, mut updater): (
|
let (handle, updater): (
|
||||||
StateHandle<UsizeCounter, PanicOnGreaterThanTen>,
|
StateHandle<UsizeCounter, PanicOnGreaterThanTen>,
|
||||||
StateUpdater<UsizeCounter>,
|
StateUpdater<UsizeCounter>,
|
||||||
) = StateHandle::new(
|
) = StateHandle::new(
|
||||||
|
@ -86,8 +86,8 @@ struct TestApp {
|
|||||||
fn derive_print_service() {
|
fn derive_print_service() {
|
||||||
let settings: TestAppServiceSettings = TestAppServiceSettings { print_service: () };
|
let settings: TestAppServiceSettings = TestAppServiceSettings { print_service: () };
|
||||||
let overwatch = OverwatchRunner::<TestApp>::run(settings, None);
|
let overwatch = OverwatchRunner::<TestApp>::run(settings, None);
|
||||||
let mut handle = overwatch.handle().clone();
|
let handle = overwatch.handle().clone();
|
||||||
let mut print_service_relay = handle.relay::<PrintService>();
|
let print_service_relay = handle.relay::<PrintService>();
|
||||||
|
|
||||||
overwatch.spawn(async move {
|
overwatch.spawn(async move {
|
||||||
let print_service_relay = print_service_relay
|
let print_service_relay = print_service_relay
|
||||||
|
@ -35,11 +35,9 @@ impl ServiceCore for SettingsService {
|
|||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
let Self {
|
let Self {
|
||||||
state:
|
state: ServiceStateHandle {
|
||||||
ServiceStateHandle {
|
settings_reader, ..
|
||||||
mut settings_reader,
|
},
|
||||||
..
|
|
||||||
},
|
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
let print = async move {
|
let print = async move {
|
||||||
@ -72,7 +70,7 @@ fn settings_service_update_settings() {
|
|||||||
};
|
};
|
||||||
let overwatch = OverwatchRunner::<TestApp>::run(settings.clone(), None);
|
let overwatch = OverwatchRunner::<TestApp>::run(settings.clone(), None);
|
||||||
let handle = overwatch.handle().clone();
|
let handle = overwatch.handle().clone();
|
||||||
let mut handle2 = handle.clone();
|
let handle2 = handle.clone();
|
||||||
settings.settings_service = "New settings".to_string();
|
settings.settings_service = "New settings".to_string();
|
||||||
overwatch.spawn(async move { handle.clone().update_settings::<TestApp>(settings).await });
|
overwatch.spawn(async move { handle.clone().update_settings::<TestApp>(settings).await });
|
||||||
|
|
||||||
|
@ -69,9 +69,7 @@ impl ServiceCore for UpdateStateService {
|
|||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
let Self {
|
let Self {
|
||||||
state: ServiceStateHandle {
|
state: ServiceStateHandle { state_updater, .. },
|
||||||
mut state_updater, ..
|
|
||||||
},
|
|
||||||
} = self;
|
} = self;
|
||||||
for value in 0..10 {
|
for value in 0..10 {
|
||||||
state_updater.update(CounterState { value });
|
state_updater.update(CounterState { value });
|
||||||
@ -91,7 +89,7 @@ fn state_update_service() {
|
|||||||
update_state_service: (),
|
update_state_service: (),
|
||||||
};
|
};
|
||||||
let overwatch = OverwatchRunner::<TestApp>::run(settings, None);
|
let overwatch = OverwatchRunner::<TestApp>::run(settings, None);
|
||||||
let mut handle = overwatch.handle().clone();
|
let handle = overwatch.handle().clone();
|
||||||
|
|
||||||
overwatch.spawn(async move {
|
overwatch.spawn(async move {
|
||||||
sleep(Duration::from_secs(1)).await;
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user