Implement base lifecycle handling in storage service

This commit is contained in:
danielsanchezq 2023-10-09 16:57:29 +02:00
parent e85c2ebe60
commit 2dfceb0eda
3 changed files with 53 additions and 18 deletions

View File

@ -137,7 +137,7 @@ async fn handle_lifecycle_message<S: ServiceData>(msg: LifecycleMessage) -> bool
overwatch_rs::services::life_cycle::LifecycleMessage::Kill => true,
overwatch_rs::services::life_cycle::LifecycleMessage::Shutdown(signal_sender) => {
// TODO: Maybe add a call to backend to handle this. Maybe trying to save unprocessed messages?
if let Err(_e) = signal_sender.send(()) {
if signal_sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
S::SERVICE_ID

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
async-trait = "0.1"
futures = "0.3"
tokio = { version = "1", features = ["sync"] }
bytes = "1.2"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }

View File

@ -6,15 +6,18 @@ use std::marker::PhantomData;
// crates
use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use overwatch_rs::services::handle::ServiceStateHandle;
use serde::de::DeserializeOwned;
use serde::Serialize;
// internal
use backends::StorageBackend;
use backends::{StorageSerde, StorageTransaction};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use tracing::error;
/// Storage message that maps to [`StorageBackend`] trait
pub enum StorageMsg<Backend: StorageBackend> {
@ -162,6 +165,39 @@ pub struct StorageService<Backend: StorageBackend + Send + Sync + 'static> {
}
impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
async fn handle_lifecycle_message(msg: LifecycleMessage) -> bool {
match msg {
LifecycleMessage::Shutdown(sender) => {
// TODO: Try to finish pending transactions if any and close connections properly
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
async fn handle_storage_message(msg: StorageMsg<Backend>, backend: &mut Backend) {
if let Err(e) = match msg {
StorageMsg::Load { key, reply_channel } => {
Self::handle_load(backend, key, reply_channel).await
}
StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await,
StorageMsg::Remove { key, reply_channel } => {
Self::handle_remove(backend, key, reply_channel).await
}
StorageMsg::Execute {
transaction,
reply_channel,
} => Self::handle_execute(backend, transaction, reply_channel).await,
} {
// TODO: add proper logging
println!("{e}");
}
}
/// Handle load message
async fn handle_load(
backend: &mut Backend,
@ -243,27 +279,25 @@ impl<Backend: StorageBackend + Send + Sync + 'static> ServiceCore for StorageSer
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut backend,
service_state: ServiceStateHandle {
mut inbound_relay, ..
},
service_state:
ServiceStateHandle {
mut inbound_relay,
lifecycle_handle,
..
},
} = self;
let mut lifecycle_stream = lifecycle_handle.message_stream();
let backend = &mut backend;
while let Some(msg) = inbound_relay.recv().await {
if let Err(e) = match msg {
StorageMsg::Load { key, reply_channel } => {
Self::handle_load(backend, key, reply_channel).await
loop {
tokio::select! {
Some(msg) = inbound_relay.recv() => {
Self::handle_storage_message(msg, backend).await;
}
StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await,
StorageMsg::Remove { key, reply_channel } => {
Self::handle_remove(backend, key, reply_channel).await
Some(msg) = lifecycle_stream.next() => {
if Self::handle_lifecycle_message(msg).await {
break;
}
}
StorageMsg::Execute {
transaction,
reply_channel,
} => Self::handle_execute(backend, transaction, reply_channel).await,
} {
// TODO: add proper logging
println!("{e}");
}
}
Ok(())