Pipe lifecycle in log service

This commit is contained in:
danielsanchezq 2023-10-10 14:40:28 +02:00
parent e157359750
commit 8be76f5381
1 changed files with 41 additions and 6 deletions

View File

@ -1,12 +1,14 @@
// std
use futures::StreamExt;
use std::net::SocketAddr;
use std::path::PathBuf;
// crates
use serde::{Deserialize, Serialize};
use tracing::Level;
use tracing::{error, Level};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{filter::LevelFilter, prelude::*};
// internal
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::NoMessage,
@ -14,7 +16,10 @@ use overwatch_rs::services::{
ServiceCore, ServiceData,
};
pub struct Logger(Option<WorkerGuard>);
pub struct Logger {
service_state: ServiceStateHandle<Self>,
worker_guard: Option<WorkerGuard>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LoggerBackend {
@ -66,10 +71,10 @@ pub enum LoggerFormat {
impl ServiceData for Logger {
const SERVICE_ID: &'static str = "Logger";
type Settings = LoggerSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
type Settings = LoggerSettings;
}
// a macro and not a function because it's a bit of a type
@ -102,7 +107,10 @@ impl ServiceCore for Logger {
.runtime()
.spawn(async move { task.connect().await });
registry_init!(layer, config.format, config.level);
return Ok(Self(None));
return Ok(Self {
service_state,
worker_guard: None,
});
}
LoggerBackend::File { directory, prefix } => {
let file_appender = tracing_appender::rolling::hourly(
@ -119,12 +127,39 @@ impl ServiceCore for Logger {
.with_level(true)
.with_writer(non_blocking);
registry_init!(layer, config.format, config.level);
Ok(Self(Some(_guard)))
Ok(Self {
service_state,
worker_guard: Some(_guard),
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state,
worker_guard,
} = self;
// keep the handle alive without stressing the runtime
futures::pending!();
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
if let Some(msg) = lifecycle_stream.next().await {
match msg {
LifecycleMessage::Shutdown(sender) => {
// flush pending logs before signaling message processing
drop(worker_guard);
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
break;
}
LifecycleMessage::Kill => {
break;
}
}
}
}
Ok(())
}
}