Pipe lifecycle in consensus service

This commit is contained in:
danielsanchezq 2023-10-10 16:51:07 +02:00
parent 0e49e06f56
commit 033ea6b9f2
2 changed files with 25 additions and 3 deletions

View File

@ -14,7 +14,7 @@ futures = "0.3"
nomos-network = { path = "../network" } nomos-network = { path = "../network" }
nomos-mempool = { path = "../mempool" } nomos-mempool = { path = "../mempool" }
nomos-core = { path = "../../nomos-core" } nomos-core = { path = "../../nomos-core" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "ac28d01" }
nomos-storage = { path = "../storage" } nomos-storage = { path = "../storage" }
rand_chacha = "0.3" rand_chacha = "0.3"
rand = "0.8" rand = "0.8"

View File

@ -19,7 +19,7 @@ use serde::Deserialize;
use serde::{de::DeserializeOwned, Serialize}; use serde::{de::DeserializeOwned, Serialize};
use serde_with::serde_as; use serde_with::serde_as;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
use tracing::instrument; use tracing::{error, instrument};
// internal // internal
use crate::network::messages::{ use crate::network::messages::{
NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg, NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
@ -46,6 +46,7 @@ use nomos_mempool::{
}; };
use nomos_network::NetworkService; use nomos_network::NetworkService;
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService}; use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
use overwatch_rs::services::{ use overwatch_rs::services::{
handle::ServiceStateHandle, handle::ServiceStateHandle,
@ -311,7 +312,7 @@ where
Event::ProposeBlock { qc } Event::ProposeBlock { qc }
}); });
} }
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
loop { loop {
tokio::select! { tokio::select! {
Some(event) = task_manager.next() => { Some(event) = task_manager.next() => {
@ -333,10 +334,16 @@ where
Some(msg) = self.service_state.inbound_relay.next() => { Some(msg) = self.service_state.inbound_relay.next() => {
Self::process_message(&carnot, msg); Self::process_message(&carnot, msg);
} }
Some(msg) = lifecycle_stream.next() => {
if Self::handle_lifecycle_message(msg).await {
break;
} }
} }
} }
} }
Ok(())
}
}
#[derive(Debug)] #[derive(Debug)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
@ -389,6 +396,21 @@ where
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static, DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
Storage: StorageBackend + Send + Sync + 'static, Storage: StorageBackend + Send + Sync + 'static,
{ {
async fn handle_lifecycle_message(message: LifecycleMessage) -> bool {
match message {
LifecycleMessage::Shutdown(sender) => {
if sender.send(()).is_err() {
error!(
"Error sending successful shutdown signal from service {}",
Self::SERVICE_ID
);
}
true
}
LifecycleMessage::Kill => true,
}
}
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) { fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
match msg { match msg {
ConsensusMsg::Info { tx } => { ConsensusMsg::Info { tx } => {