From 033ea6b9f280141305bee6e2da05120f0fb88f5d Mon Sep 17 00:00:00 2001 From: danielsanchezq Date: Tue, 10 Oct 2023 16:51:07 +0200 Subject: [PATCH] Pipe lifecycle in consensus service --- nomos-services/consensus/Cargo.toml | 2 +- nomos-services/consensus/src/lib.rs | 26 ++++++++++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/nomos-services/consensus/Cargo.toml b/nomos-services/consensus/Cargo.toml index c9246aef..11bae7e1 100644 --- a/nomos-services/consensus/Cargo.toml +++ b/nomos-services/consensus/Cargo.toml @@ -14,7 +14,7 @@ futures = "0.3" nomos-network = { path = "../network" } nomos-mempool = { path = "../mempool" } nomos-core = { path = "../../nomos-core" } -overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "ac28d01" } nomos-storage = { path = "../storage" } rand_chacha = "0.3" rand = "0.8" diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 6d5b26d6..ff705c94 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -19,7 +19,7 @@ use serde::Deserialize; use serde::{de::DeserializeOwned, Serialize}; use serde_with::serde_as; use tokio::sync::oneshot::Sender; -use tracing::instrument; +use tracing::{error, instrument}; // internal use crate::network::messages::{ NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, @@ -46,6 +46,7 @@ use nomos_mempool::{ }; use nomos_network::NetworkService; use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService}; +use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -311,7 +312,7 @@ where Event::ProposeBlock { qc } }); } - + let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream(); loop { tokio::select! { Some(event) = task_manager.next() => { @@ -333,8 +334,14 @@ where Some(msg) = self.service_state.inbound_relay.next() => { Self::process_message(&carnot, msg); } + Some(msg) = lifecycle_stream.next() => { + if Self::handle_lifecycle_message(msg).await { + break; + } + } } } + Ok(()) } } @@ -389,6 +396,21 @@ where DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, Storage: StorageBackend + Send + Sync + 'static, { + async fn handle_lifecycle_message(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + fn process_message(carnot: &Carnot, msg: ConsensusMsg) { match msg { ConsensusMsg::Info { tx } => {