Implement base lifecycle handling in network service

This commit is contained in:
danielsanchezq 2023-10-09 16:28:37 +02:00
parent 4a562ec188
commit e85c2ebe60
1 changed files with 55 additions and 18 deletions

View File

@ -4,17 +4,20 @@ use std::fmt::{self, Debug};
// crates
use async_trait::async_trait;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::oneshot;
// internal
use backends::NetworkBackend;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::RelayMessage,
state::{NoOperator, ServiceState},
ServiceCore, ServiceData, ServiceId,
};
use tracing::error;
pub enum NetworkMsg<B: NetworkBackend> {
Process(B::Message),
@ -84,33 +87,67 @@ where
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state: ServiceStateHandle {
mut inbound_relay, ..
},
service_state:
ServiceStateHandle {
mut inbound_relay,
lifecycle_handle,
..
},
mut backend,
} = self;
while let Some(msg) = inbound_relay.recv().await {
match msg {
NetworkMsg::Process(msg) => {
// split sending in two steps to help the compiler understand we do not
// need to hold an instance of &I (which is not send) across an await point
let _send = backend.process(msg);
_send.await
let mut lifecycle_stream = lifecycle_handle.message_stream();
loop {
tokio::select! {
Some(msg) = inbound_relay.recv() => {
handle_network_service_message(msg, &mut backend).await;
}
Some(msg) = lifecycle_stream.next() => {
if handle_lifecycle_message::<Self>(msg).await {
break;
}
}
NetworkMsg::Subscribe { kind, sender } => sender
.send(backend.subscribe(kind).await)
.unwrap_or_else(|_| {
tracing::warn!(
"client hung up before a subscription handle could be established"
)
}),
}
}
Ok(())
}
}
async fn handle_network_service_message<B>(msg: NetworkMsg<B>, backend: &mut B)
where
B: NetworkBackend + Send + 'static,
B::State: Send + Sync,
{
match msg {
NetworkMsg::Process(msg) => {
// split sending in two steps to help the compiler understand we do not
// need to hold an instance of &I (which is not send) across an await point
let _send = backend.process(msg);
_send.await
}
NetworkMsg::Subscribe { kind, sender } => sender
.send(backend.subscribe(kind).await)
.unwrap_or_else(|_| {
tracing::warn!("client hung up before a subscription handle could be established")
}),
}
}
async fn handle_lifecycle_message<S: ServiceData>(msg: LifecycleMessage) -> bool {
match msg {
overwatch_rs::services::life_cycle::LifecycleMessage::Kill => true,
overwatch_rs::services::life_cycle::LifecycleMessage::Shutdown(signal_sender) => {
// TODO: Maybe add a call to backend to handle this. Maybe trying to save unprocessed messages?
if let Err(_e) = signal_sender.send(()) {
error!(
"Error sending successful shutdown signal from service {}",
S::SERVICE_ID
);
}
true
}
}
}
impl<B: NetworkBackend> Clone for NetworkConfig<B> {
fn clone(&self) -> Self {
NetworkConfig {