Lifecycle update and implementations (#457)
* Update deps * Implement base lifecycle handling in network service * Implement base lifecycle handling in storage service * Use methods instead of functions * Pipe lifecycle in metrics service * Pipe lifecycle in mempool service * Pipe lifecycle in log service * Pipe lifecycle in da service * Pipe lifecycle in consensus service * Refactor handling of lifecycle message to should_stop_service * Update overwatch version to fixed run_all one
This commit is contained in:
parent
89b8e27612
commit
75b36020c2
|
@ -9,8 +9,8 @@ mixnet-node = { path = "../../mixnet/node" }
|
||||||
nomos-log = { path = "../../nomos-services/log" }
|
nomos-log = { path = "../../nomos-services/log" }
|
||||||
clap = { version = "4", features = ["derive"] }
|
clap = { version = "4", features = ["derive"] }
|
||||||
color-eyre = "0.6.0"
|
color-eyre = "0.6.0"
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
serde = "1"
|
serde = "1"
|
||||||
serde_yaml = "0.9"
|
serde_yaml = "0.9"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
|
|
|
@ -14,8 +14,8 @@ chrono = "0.4"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
http = "0.2.9"
|
http = "0.2.9"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
multiaddr = "0.18"
|
multiaddr = "0.18"
|
||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
|
|
|
@ -15,8 +15,8 @@ clap = {version = "4", features = ["derive"] }
|
||||||
serde_yaml = "0.9"
|
serde_yaml = "0.9"
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
|
||||||
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
|
nomos-da = { path = "../nomos-services/data-availability", features = ["libp2p"] }
|
||||||
nomos-libp2p = { path = "../nomos-libp2p"}
|
nomos-libp2p = { path = "../nomos-libp2p"}
|
||||||
|
|
|
@ -8,8 +8,8 @@ axum = ["dep:axum", "dep:hyper", "utoipa-swagger-ui/axum"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
utoipa = "4.0"
|
utoipa = "4.0"
|
||||||
utoipa-swagger-ui = { version = "4.0" }
|
utoipa-swagger-ui = { version = "4.0" }
|
||||||
|
|
|
@ -14,7 +14,7 @@ futures = "0.3"
|
||||||
nomos-network = { path = "../network" }
|
nomos-network = { path = "../network" }
|
||||||
nomos-mempool = { path = "../mempool" }
|
nomos-mempool = { path = "../mempool" }
|
||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch",rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
nomos-storage = { path = "../storage" }
|
nomos-storage = { path = "../storage" }
|
||||||
rand_chacha = "0.3"
|
rand_chacha = "0.3"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|
|
@ -19,7 +19,7 @@ use serde::Deserialize;
|
||||||
use serde::{de::DeserializeOwned, Serialize};
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
use serde_with::serde_as;
|
use serde_with::serde_as;
|
||||||
use tokio::sync::oneshot::Sender;
|
use tokio::sync::oneshot::Sender;
|
||||||
use tracing::instrument;
|
use tracing::{error, instrument};
|
||||||
// internal
|
// internal
|
||||||
use crate::network::messages::{
|
use crate::network::messages::{
|
||||||
NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
|
NetworkMessage, NewViewMsg, ProposalMsg, TimeoutMsg, TimeoutQcMsg, VoteMsg,
|
||||||
|
@ -46,6 +46,7 @@ use nomos_mempool::{
|
||||||
};
|
};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
|
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
|
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
|
@ -311,7 +312,7 @@ where
|
||||||
Event::ProposeBlock { qc }
|
Event::ProposeBlock { qc }
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(event) = task_manager.next() => {
|
Some(event) = task_manager.next() => {
|
||||||
|
@ -333,8 +334,14 @@ where
|
||||||
Some(msg) = self.service_state.inbound_relay.next() => {
|
Some(msg) = self.service_state.inbound_relay.next() => {
|
||||||
Self::process_message(&carnot, msg);
|
Self::process_message(&carnot, msg);
|
||||||
}
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,6 +396,21 @@ where
|
||||||
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
|
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
|
||||||
Storage: StorageBackend + Send + Sync + 'static,
|
Storage: StorageBackend + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
|
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
|
fn process_message(carnot: &Carnot<O>, msg: ConsensusMsg) {
|
||||||
match msg {
|
match msg {
|
||||||
ConsensusMsg::Info { tx } => {
|
ConsensusMsg::Info { tx } => {
|
||||||
|
|
|
@ -11,7 +11,7 @@ futures = "0.3"
|
||||||
moka = { version = "0.11", features = ["future"] }
|
moka = { version = "0.11", features = ["future"] }
|
||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
nomos-network = { path = "../network" }
|
nomos-network = { path = "../network" }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tokio = { version = "1", features = ["sync", "macros"] }
|
tokio = { version = "1", features = ["sync", "macros"] }
|
||||||
|
|
|
@ -14,9 +14,11 @@ use crate::network::NetworkAdapter;
|
||||||
use nomos_core::da::{blob::Blob, DaProtocol};
|
use nomos_core::da::{blob::Blob, DaProtocol};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
use overwatch_rs::services::relay::{Relay, RelayMessage};
|
||||||
use overwatch_rs::services::state::{NoOperator, NoState};
|
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||||
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
pub struct DataAvailabilityService<Protocol, Backend, Network>
|
pub struct DataAvailabilityService<Protocol, Backend, Network>
|
||||||
where
|
where
|
||||||
|
@ -70,6 +72,76 @@ where
|
||||||
type Message = DaMsg<Protocol::Blob>;
|
type Message = DaMsg<Protocol::Blob>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<Protocol, Backend, Network> DataAvailabilityService<Protocol, Backend, Network>
|
||||||
|
where
|
||||||
|
Protocol: DaProtocol + Send + Sync,
|
||||||
|
Backend: DaBackend<Blob = Protocol::Blob> + Send + Sync,
|
||||||
|
Protocol::Settings: Clone + Send + Sync + 'static,
|
||||||
|
Protocol::Blob: 'static,
|
||||||
|
Backend::Settings: Clone + Send + Sync + 'static,
|
||||||
|
Protocol::Blob: Send,
|
||||||
|
Protocol::Attestation: Send,
|
||||||
|
<Backend::Blob as Blob>::Hash: Debug + Send + Sync,
|
||||||
|
Network:
|
||||||
|
NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation> + Send + Sync,
|
||||||
|
{
|
||||||
|
async fn handle_new_blob(
|
||||||
|
da: &Protocol,
|
||||||
|
backend: &Backend,
|
||||||
|
adapter: &Network,
|
||||||
|
blob: Protocol::Blob,
|
||||||
|
) -> Result<(), DaError> {
|
||||||
|
// we need to handle the reply (verification + signature)
|
||||||
|
let attestation = da.attest(&blob);
|
||||||
|
backend.add_blob(blob).await?;
|
||||||
|
// we do not call `da.recv_blob` here because that is meant to
|
||||||
|
// be called to retrieve the original data, while here we're only interested
|
||||||
|
// in storing the blob.
|
||||||
|
// We might want to refactor the backend to be part of implementations of the
|
||||||
|
// Da protocol instead of this service and clear this confusion.
|
||||||
|
adapter
|
||||||
|
.send_attestation(attestation)
|
||||||
|
.await
|
||||||
|
.map_err(DaError::Dyn)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_da_msg(backend: &Backend, msg: DaMsg<Backend::Blob>) -> Result<(), DaError> {
|
||||||
|
match msg {
|
||||||
|
DaMsg::RemoveBlobs { blobs } => {
|
||||||
|
futures::stream::iter(blobs)
|
||||||
|
.for_each_concurrent(None, |blob| async move {
|
||||||
|
if let Err(e) = backend.remove_blob(&blob).await {
|
||||||
|
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
DaMsg::Get { ids, reply_channel } => {
|
||||||
|
let res = ids.filter_map(|id| backend.get_blob(&id)).collect();
|
||||||
|
if reply_channel.send(res).is_err() {
|
||||||
|
tracing::error!("Could not returns blobs");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<Protocol, Backend, Network> ServiceCore for DataAvailabilityService<Protocol, Backend, Network>
|
impl<Protocol, Backend, Network> ServiceCore for DataAvailabilityService<Protocol, Backend, Network>
|
||||||
where
|
where
|
||||||
|
@ -111,71 +183,30 @@ where
|
||||||
|
|
||||||
let adapter = Network::new(network_relay).await;
|
let adapter = Network::new(network_relay).await;
|
||||||
let mut network_blobs = adapter.blob_stream().await;
|
let mut network_blobs = adapter.blob_stream().await;
|
||||||
|
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(blob) = network_blobs.next() => {
|
Some(blob) = network_blobs.next() => {
|
||||||
if let Err(e) = handle_new_blob(&da, &backend, &adapter, blob).await {
|
if let Err(e) = Self::handle_new_blob(&da, &backend, &adapter, blob).await {
|
||||||
tracing::debug!("Failed to add a new received blob: {e:?}");
|
tracing::debug!("Failed to add a new received blob: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(msg) = service_state.inbound_relay.recv() => {
|
Some(msg) = service_state.inbound_relay.recv() => {
|
||||||
if let Err(e) = handle_da_msg(&backend, msg).await {
|
if let Err(e) = Self::handle_da_msg(&backend, msg).await {
|
||||||
tracing::debug!("Failed to handle da msg: {e:?}");
|
tracing::debug!("Failed to handle da msg: {e:?}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_new_blob<
|
|
||||||
Protocol: DaProtocol,
|
|
||||||
Backend: DaBackend<Blob = Protocol::Blob>,
|
|
||||||
A: NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation>,
|
|
||||||
>(
|
|
||||||
da: &Protocol,
|
|
||||||
backend: &Backend,
|
|
||||||
adapter: &A,
|
|
||||||
blob: Protocol::Blob,
|
|
||||||
) -> Result<(), DaError> {
|
|
||||||
// we need to handle the reply (verification + signature)
|
|
||||||
let attestation = da.attest(&blob);
|
|
||||||
backend.add_blob(blob).await?;
|
|
||||||
// we do not call `da.recv_blob` here because that is meant to
|
|
||||||
// be called to retrieve the original data, while here we're only interested
|
|
||||||
// in storing the blob.
|
|
||||||
// We might want to refactor the backend to be part of implementations of the
|
|
||||||
// Da protocol instead of this service and clear this confusion.
|
|
||||||
adapter
|
|
||||||
.send_attestation(attestation)
|
|
||||||
.await
|
|
||||||
.map_err(DaError::Dyn)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_da_msg<B: DaBackend>(backend: &B, msg: DaMsg<B::Blob>) -> Result<(), DaError>
|
|
||||||
where
|
|
||||||
<B::Blob as Blob>::Hash: Debug,
|
|
||||||
{
|
|
||||||
match msg {
|
|
||||||
DaMsg::RemoveBlobs { blobs } => {
|
|
||||||
futures::stream::iter(blobs)
|
|
||||||
.for_each_concurrent(None, |blob| async move {
|
|
||||||
if let Err(e) = backend.remove_blob(&blob).await {
|
|
||||||
tracing::debug!("Could not remove blob {blob:?} due to: {e:?}");
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
DaMsg::Get { ids, reply_channel } => {
|
|
||||||
let res = ids.filter_map(|id| backend.get_blob(&id)).collect();
|
|
||||||
if reply_channel.send(res).is_err() {
|
|
||||||
tracing::error!("Could not returns blobs");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
pub struct Settings<P, B> {
|
pub struct Settings<P, B> {
|
||||||
pub da_protocol: P,
|
pub da_protocol: P,
|
||||||
|
|
|
@ -25,8 +25,8 @@ clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
http = "0.2.9"
|
http = "0.2.9"
|
||||||
hyper = { version = "0.14", optional = true }
|
hyper = { version = "0.14", optional = true }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
parking_lot = { version = "0.12", optional = true }
|
parking_lot = { version = "0.12", optional = true }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = { version = "1.0", optional = true }
|
serde_json = { version = "1.0", optional = true }
|
||||||
|
|
|
@ -7,7 +7,7 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-appender = "0.2"
|
tracing-appender = "0.2"
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
// std
|
// std
|
||||||
|
use futures::StreamExt;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
// crates
|
// crates
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tracing::Level;
|
use tracing::{error, Level};
|
||||||
use tracing_appender::non_blocking::WorkerGuard;
|
use tracing_appender::non_blocking::WorkerGuard;
|
||||||
use tracing_subscriber::{filter::LevelFilter, prelude::*};
|
use tracing_subscriber::{filter::LevelFilter, prelude::*};
|
||||||
// internal
|
// internal
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
relay::NoMessage,
|
relay::NoMessage,
|
||||||
|
@ -14,7 +16,10 @@ use overwatch_rs::services::{
|
||||||
ServiceCore, ServiceData,
|
ServiceCore, ServiceData,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct Logger(Option<WorkerGuard>);
|
pub struct Logger {
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
worker_guard: Option<WorkerGuard>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub enum LoggerBackend {
|
pub enum LoggerBackend {
|
||||||
|
@ -66,10 +71,10 @@ pub enum LoggerFormat {
|
||||||
|
|
||||||
impl ServiceData for Logger {
|
impl ServiceData for Logger {
|
||||||
const SERVICE_ID: &'static str = "Logger";
|
const SERVICE_ID: &'static str = "Logger";
|
||||||
|
type Settings = LoggerSettings;
|
||||||
type State = NoState<Self::Settings>;
|
type State = NoState<Self::Settings>;
|
||||||
type StateOperator = NoOperator<Self::State>;
|
type StateOperator = NoOperator<Self::State>;
|
||||||
type Message = NoMessage;
|
type Message = NoMessage;
|
||||||
type Settings = LoggerSettings;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// a macro and not a function because it's a bit of a type
|
// a macro and not a function because it's a bit of a type
|
||||||
|
@ -102,7 +107,10 @@ impl ServiceCore for Logger {
|
||||||
.runtime()
|
.runtime()
|
||||||
.spawn(async move { task.connect().await });
|
.spawn(async move { task.connect().await });
|
||||||
registry_init!(layer, config.format, config.level);
|
registry_init!(layer, config.format, config.level);
|
||||||
return Ok(Self(None));
|
return Ok(Self {
|
||||||
|
service_state,
|
||||||
|
worker_guard: None,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
LoggerBackend::File { directory, prefix } => {
|
LoggerBackend::File { directory, prefix } => {
|
||||||
let file_appender = tracing_appender::rolling::hourly(
|
let file_appender = tracing_appender::rolling::hourly(
|
||||||
|
@ -119,12 +127,39 @@ impl ServiceCore for Logger {
|
||||||
.with_level(true)
|
.with_level(true)
|
||||||
.with_writer(non_blocking);
|
.with_writer(non_blocking);
|
||||||
registry_init!(layer, config.format, config.level);
|
registry_init!(layer, config.format, config.level);
|
||||||
Ok(Self(Some(_guard)))
|
Ok(Self {
|
||||||
|
service_state,
|
||||||
|
worker_guard: Some(_guard),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||||
|
let Self {
|
||||||
|
service_state,
|
||||||
|
worker_guard,
|
||||||
|
} = self;
|
||||||
// keep the handle alive without stressing the runtime
|
// keep the handle alive without stressing the runtime
|
||||||
futures::pending!();
|
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||||
|
loop {
|
||||||
|
if let Some(msg) = lifecycle_stream.next().await {
|
||||||
|
match msg {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
// flush pending logs before signaling message processing
|
||||||
|
drop(worker_guard);
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ futures = "0.3"
|
||||||
linked-hash-map = { version = "0.5.6", optional = true }
|
linked-hash-map = { version = "0.5.6", optional = true }
|
||||||
nomos-network = { path = "../network" }
|
nomos-network = { path = "../network" }
|
||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
rand = { version = "0.8", optional = true }
|
rand = { version = "0.8", optional = true }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
@ -23,7 +23,7 @@ chrono = "0.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
nomos-log = { path = "../log" }
|
nomos-log = { path = "../log" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
blake2 = "0.10"
|
blake2 = "0.10"
|
||||||
|
|
||||||
|
|
|
@ -14,13 +14,15 @@ use tokio::sync::oneshot::Sender;
|
||||||
use crate::network::NetworkAdapter;
|
use crate::network::NetworkAdapter;
|
||||||
use backend::{MemPool, Status};
|
use backend::{MemPool, Status};
|
||||||
use nomos_core::block::BlockId;
|
use nomos_core::block::BlockId;
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::{NetworkMsg, NetworkService};
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
relay::{OutboundRelay, Relay, RelayMessage},
|
relay::{OutboundRelay, Relay, RelayMessage},
|
||||||
state::{NoOperator, NoState},
|
state::{NoOperator, NoState},
|
||||||
ServiceCore, ServiceData, ServiceId,
|
ServiceCore, ServiceData, ServiceId,
|
||||||
};
|
};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
pub struct MempoolService<N, P, D>
|
pub struct MempoolService<N, P, D>
|
||||||
where
|
where
|
||||||
|
@ -168,7 +170,7 @@ where
|
||||||
..
|
..
|
||||||
} = self;
|
} = self;
|
||||||
|
|
||||||
let network_relay: OutboundRelay<_> = network_relay
|
let mut network_relay: OutboundRelay<_> = network_relay
|
||||||
.connect()
|
.connect()
|
||||||
.await
|
.await
|
||||||
.expect("Relay connection with NetworkService should succeed");
|
.expect("Relay connection with NetworkService should succeed");
|
||||||
|
@ -180,69 +182,124 @@ where
|
||||||
let adapter = adapter.await;
|
let adapter = adapter.await;
|
||||||
|
|
||||||
let mut network_items = adapter.transactions_stream().await;
|
let mut network_items = adapter.transactions_stream().await;
|
||||||
|
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(msg) = service_state.inbound_relay.recv() => {
|
Some(msg) = service_state.inbound_relay.recv() => {
|
||||||
match msg {
|
Self::handle_mempool_message(msg, &mut pool, &mut network_relay, &mut service_state).await;
|
||||||
MempoolMsg::Add { item, key, reply_channel } => {
|
|
||||||
match pool.add_item(key, item.clone()) {
|
|
||||||
Ok(_id) => {
|
|
||||||
// Broadcast the item to the network
|
|
||||||
let net = network_relay.clone();
|
|
||||||
let settings = service_state.settings_reader.get_updated_settings().network;
|
|
||||||
// move sending to a new task so local operations can complete in the meantime
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let adapter = N::new(settings, net).await;
|
|
||||||
adapter.send(item).await;
|
|
||||||
});
|
|
||||||
if let Err(e) = reply_channel.send(Ok(())) {
|
|
||||||
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::debug!("could not add tx to the pool due to: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
MempoolMsg::View { ancestor_hint, reply_channel } => {
|
|
||||||
reply_channel.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {
|
|
||||||
tracing::debug!("could not send back pool view")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
MempoolMsg::MarkInBlock { ids, block } => {
|
|
||||||
pool.mark_in_block(&ids, block);
|
|
||||||
}
|
|
||||||
#[cfg(test)]
|
|
||||||
MempoolMsg::BlockItems { block, reply_channel } => {
|
|
||||||
reply_channel.send(pool.block_items(block)).unwrap_or_else(|_| {
|
|
||||||
tracing::debug!("could not send back block items")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
MempoolMsg::Prune { ids } => { pool.prune(&ids); },
|
|
||||||
MempoolMsg::Metrics { reply_channel } => {
|
|
||||||
let metrics = MempoolMetrics {
|
|
||||||
pending_items: pool.pending_item_count(),
|
|
||||||
last_item_timestamp: pool.last_item_timestamp(),
|
|
||||||
};
|
|
||||||
reply_channel.send(metrics).unwrap_or_else(|_| {
|
|
||||||
tracing::debug!("could not send back mempool metrics")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
MempoolMsg::Status {
|
|
||||||
items, reply_channel
|
|
||||||
} => {
|
|
||||||
reply_channel.send(pool.status(&items)).unwrap_or_else(|_| {
|
|
||||||
tracing::debug!("could not send back mempool status")
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Some((key, item )) = network_items.next() => {
|
Some((key, item )) = network_items.next() => {
|
||||||
pool.add_item(key, item).unwrap_or_else(|e| {
|
pool.add_item(key, item).unwrap_or_else(|e| {
|
||||||
tracing::debug!("could not add item to the pool due to: {}", e)
|
tracing::debug!("could not add item to the pool due to: {}", e)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N, P, D> MempoolService<N, P, D>
|
||||||
|
where
|
||||||
|
P: MemPool + Send + 'static,
|
||||||
|
P::Settings: Clone + Send + Sync + 'static,
|
||||||
|
N::Settings: Clone + Send + Sync + 'static,
|
||||||
|
P::Item: Clone + Debug + Send + Sync + 'static,
|
||||||
|
P::Key: Debug + Send + Sync + 'static,
|
||||||
|
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
|
||||||
|
D: Discriminant + Send,
|
||||||
|
{
|
||||||
|
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_mempool_message(
|
||||||
|
message: MempoolMsg<P::Item, P::Key>,
|
||||||
|
pool: &mut P,
|
||||||
|
network_relay: &mut OutboundRelay<NetworkMsg<N::Backend>>,
|
||||||
|
service_state: &mut ServiceStateHandle<Self>,
|
||||||
|
) {
|
||||||
|
match message {
|
||||||
|
MempoolMsg::Add {
|
||||||
|
item,
|
||||||
|
key,
|
||||||
|
reply_channel,
|
||||||
|
} => {
|
||||||
|
match pool.add_item(key, item.clone()) {
|
||||||
|
Ok(_id) => {
|
||||||
|
// Broadcast the item to the network
|
||||||
|
let net = network_relay.clone();
|
||||||
|
let settings = service_state.settings_reader.get_updated_settings().network;
|
||||||
|
// move sending to a new task so local operations can complete in the meantime
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let adapter = N::new(settings, net).await;
|
||||||
|
adapter.send(item).await;
|
||||||
|
});
|
||||||
|
if let Err(e) = reply_channel.send(Ok(())) {
|
||||||
|
tracing::debug!("Failed to send reply to AddTx: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::debug!("could not add tx to the pool due to: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MempoolMsg::View {
|
||||||
|
ancestor_hint,
|
||||||
|
reply_channel,
|
||||||
|
} => {
|
||||||
|
reply_channel
|
||||||
|
.send(pool.view(ancestor_hint))
|
||||||
|
.unwrap_or_else(|_| tracing::debug!("could not send back pool view"));
|
||||||
|
}
|
||||||
|
MempoolMsg::MarkInBlock { ids, block } => {
|
||||||
|
pool.mark_in_block(&ids, block);
|
||||||
|
}
|
||||||
|
#[cfg(test)]
|
||||||
|
MempoolMsg::BlockItems {
|
||||||
|
block,
|
||||||
|
reply_channel,
|
||||||
|
} => {
|
||||||
|
reply_channel
|
||||||
|
.send(pool.block_items(block))
|
||||||
|
.unwrap_or_else(|_| tracing::debug!("could not send back block items"));
|
||||||
|
}
|
||||||
|
MempoolMsg::Prune { ids } => {
|
||||||
|
pool.prune(&ids);
|
||||||
|
}
|
||||||
|
MempoolMsg::Metrics { reply_channel } => {
|
||||||
|
let metrics = MempoolMetrics {
|
||||||
|
pending_items: pool.pending_item_count(),
|
||||||
|
last_item_timestamp: pool.last_item_timestamp(),
|
||||||
|
};
|
||||||
|
reply_channel
|
||||||
|
.send(metrics)
|
||||||
|
.unwrap_or_else(|_| tracing::debug!("could not send back mempool metrics"));
|
||||||
|
}
|
||||||
|
MempoolMsg::Status {
|
||||||
|
items,
|
||||||
|
reply_channel,
|
||||||
|
} => {
|
||||||
|
reply_channel
|
||||||
|
.send(pool.status(&items))
|
||||||
|
.unwrap_or_else(|_| tracing::debug!("could not send back mempool status"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,9 +14,10 @@ async-graphql = { version = "5", optional = true, features = ["tracing"] }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
bytes = "1.3"
|
bytes = "1.3"
|
||||||
clap = { version = "4", features = ["derive", "env"], optional = true }
|
clap = { version = "4", features = ["derive", "env"], optional = true }
|
||||||
|
futures = "0.3"
|
||||||
nomos-http = { path = "../http", optional = true }
|
nomos-http = { path = "../http", optional = true }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
|
||||||
once_cell = "1.16"
|
once_cell = "1.16"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
prometheus = "0.13"
|
prometheus = "0.13"
|
||||||
|
@ -27,7 +28,6 @@ tracing = "0.1"
|
||||||
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
||||||
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
|
tower-http = { version = "0.3", features = ["cors", "trace"], optional = true }
|
||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
futures = "0.3"
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
|
|
@ -1,16 +1,23 @@
|
||||||
|
// std
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
|
||||||
|
// crates
|
||||||
|
use futures::StreamExt;
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
// internal
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
|
use overwatch_rs::services::{
|
||||||
|
handle::ServiceStateHandle,
|
||||||
|
relay::RelayMessage,
|
||||||
|
state::{NoOperator, NoState},
|
||||||
|
ServiceCore, ServiceData, ServiceId,
|
||||||
|
};
|
||||||
|
|
||||||
pub mod backend;
|
pub mod backend;
|
||||||
pub mod frontend;
|
pub mod frontend;
|
||||||
pub mod types;
|
pub mod types;
|
||||||
|
|
||||||
use overwatch_rs::services::{
|
|
||||||
handle::ServiceStateHandle,
|
|
||||||
relay::{InboundRelay, RelayMessage},
|
|
||||||
state::{NoOperator, NoState},
|
|
||||||
ServiceCore, ServiceData, ServiceId,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait MetricsBackend {
|
pub trait MetricsBackend {
|
||||||
type MetricsData: Clone + Send + Sync + Debug + 'static;
|
type MetricsData: Clone + Send + Sync + Debug + 'static;
|
||||||
|
@ -22,7 +29,7 @@ pub trait MetricsBackend {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct MetricsService<Backend: MetricsBackend> {
|
pub struct MetricsService<Backend: MetricsBackend> {
|
||||||
inbound_relay: InboundRelay<MetricsMessage<Backend::MetricsData>>,
|
service_state: ServiceStateHandle<Self>,
|
||||||
backend: Backend,
|
backend: Backend,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,6 +135,35 @@ impl<Backend: MetricsBackend> MetricsService<Backend> {
|
||||||
) {
|
) {
|
||||||
backend.update(service_id, metrics).await;
|
backend.update(service_id, metrics).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_message(message: MetricsMessage<Backend::MetricsData>, backend: &mut Backend) {
|
||||||
|
match message {
|
||||||
|
MetricsMessage::Load {
|
||||||
|
service_id,
|
||||||
|
reply_channel,
|
||||||
|
} => {
|
||||||
|
MetricsService::handle_load(backend, &service_id, reply_channel).await;
|
||||||
|
}
|
||||||
|
MetricsMessage::Update { service_id, data } => {
|
||||||
|
MetricsService::handle_update(backend, &service_id, data).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
|
@ -135,28 +171,32 @@ impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for MetricsSer
|
||||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
let settings = service_state.settings_reader.get_updated_settings();
|
let settings = service_state.settings_reader.get_updated_settings();
|
||||||
let backend = Backend::init(settings);
|
let backend = Backend::init(settings);
|
||||||
let inbound_relay = service_state.inbound_relay;
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inbound_relay,
|
service_state,
|
||||||
backend,
|
backend,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
async fn run(self) -> Result<(), overwatch_rs::DynError> {
|
||||||
let Self {
|
let Self {
|
||||||
mut inbound_relay,
|
service_state:
|
||||||
|
ServiceStateHandle {
|
||||||
|
mut inbound_relay,
|
||||||
|
lifecycle_handle,
|
||||||
|
..
|
||||||
|
},
|
||||||
mut backend,
|
mut backend,
|
||||||
} = self;
|
} = self;
|
||||||
while let Some(message) = inbound_relay.recv().await {
|
let mut lifecycle_stream = lifecycle_handle.message_stream();
|
||||||
match message {
|
loop {
|
||||||
MetricsMessage::Load {
|
tokio::select! {
|
||||||
service_id,
|
Some(message) = inbound_relay.recv() => {
|
||||||
reply_channel,
|
Self::handle_message(message, &mut backend).await;
|
||||||
} => {
|
|
||||||
MetricsService::handle_load(&backend, &service_id, reply_channel).await;
|
|
||||||
}
|
}
|
||||||
MetricsMessage::Update { service_id, data } => {
|
Some(message) = lifecycle_stream.next() => {
|
||||||
MetricsService::handle_update(&mut backend, &service_id, data).await;
|
if Self::should_stop_service(message).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ async-trait = "0.1"
|
||||||
bytes = "1.2"
|
bytes = "1.2"
|
||||||
chrono = { version = "0.4", optional = true }
|
chrono = { version = "0.4", optional = true }
|
||||||
humantime-serde = { version = "1", optional = true }
|
humantime-serde = { version = "1", optional = true }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
multiaddr = "0.15"
|
multiaddr = "0.15"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
sscanf = { version = "0.4", optional = true }
|
sscanf = { version = "0.4", optional = true }
|
||||||
|
|
|
@ -4,17 +4,20 @@ use std::fmt::{self, Debug};
|
||||||
|
|
||||||
// crates
|
// crates
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::broadcast;
|
use tokio::sync::broadcast;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
// internal
|
// internal
|
||||||
use backends::NetworkBackend;
|
use backends::NetworkBackend;
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::{
|
use overwatch_rs::services::{
|
||||||
handle::ServiceStateHandle,
|
handle::ServiceStateHandle,
|
||||||
relay::RelayMessage,
|
relay::RelayMessage,
|
||||||
state::{NoOperator, ServiceState},
|
state::{NoOperator, ServiceState},
|
||||||
ServiceCore, ServiceData, ServiceId,
|
ServiceCore, ServiceData, ServiceId,
|
||||||
};
|
};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
pub enum NetworkMsg<B: NetworkBackend> {
|
pub enum NetworkMsg<B: NetworkBackend> {
|
||||||
Process(B::Message),
|
Process(B::Message),
|
||||||
|
@ -84,33 +87,71 @@ where
|
||||||
|
|
||||||
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||||
let Self {
|
let Self {
|
||||||
service_state: ServiceStateHandle {
|
service_state:
|
||||||
mut inbound_relay, ..
|
ServiceStateHandle {
|
||||||
},
|
mut inbound_relay,
|
||||||
|
lifecycle_handle,
|
||||||
|
..
|
||||||
|
},
|
||||||
mut backend,
|
mut backend,
|
||||||
} = self;
|
} = self;
|
||||||
|
let mut lifecycle_stream = lifecycle_handle.message_stream();
|
||||||
while let Some(msg) = inbound_relay.recv().await {
|
loop {
|
||||||
match msg {
|
tokio::select! {
|
||||||
NetworkMsg::Process(msg) => {
|
Some(msg) = inbound_relay.recv() => {
|
||||||
// split sending in two steps to help the compiler understand we do not
|
Self::handle_network_service_message(msg, &mut backend).await;
|
||||||
// need to hold an instance of &I (which is not send) across an await point
|
}
|
||||||
let _send = backend.process(msg);
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
_send.await
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
NetworkMsg::Subscribe { kind, sender } => sender
|
|
||||||
.send(backend.subscribe(kind).await)
|
|
||||||
.unwrap_or_else(|_| {
|
|
||||||
tracing::warn!(
|
|
||||||
"client hung up before a subscription handle could be established"
|
|
||||||
)
|
|
||||||
}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<B> NetworkService<B>
|
||||||
|
where
|
||||||
|
B: NetworkBackend + Send + 'static,
|
||||||
|
B::State: Send + Sync,
|
||||||
|
{
|
||||||
|
async fn handle_network_service_message(msg: NetworkMsg<B>, backend: &mut B) {
|
||||||
|
match msg {
|
||||||
|
NetworkMsg::Process(msg) => {
|
||||||
|
// split sending in two steps to help the compiler understand we do not
|
||||||
|
// need to hold an instance of &I (which is not send) across an await point
|
||||||
|
let _send = backend.process(msg);
|
||||||
|
_send.await
|
||||||
|
}
|
||||||
|
NetworkMsg::Subscribe { kind, sender } => sender
|
||||||
|
.send(backend.subscribe(kind).await)
|
||||||
|
.unwrap_or_else(|_| {
|
||||||
|
tracing::warn!(
|
||||||
|
"client hung up before a subscription handle could be established"
|
||||||
|
)
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn should_stop_service(msg: LifecycleMessage) -> bool {
|
||||||
|
match msg {
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
LifecycleMessage::Shutdown(signal_sender) => {
|
||||||
|
// TODO: Maybe add a call to backend to handle this. Maybe trying to save unprocessed messages?
|
||||||
|
if signal_sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<B: NetworkBackend> Clone for NetworkConfig<B> {
|
impl<B: NetworkBackend> Clone for NetworkConfig<B> {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
NetworkConfig {
|
NetworkConfig {
|
||||||
|
|
|
@ -7,9 +7,10 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
|
futures = "0.3"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
bytes = "1.2"
|
bytes = "1.2"
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
sled = { version = "0.34", optional = true }
|
sled = { version = "0.34", optional = true }
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
|
|
@ -6,15 +6,18 @@ use std::marker::PhantomData;
|
||||||
// crates
|
// crates
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use futures::StreamExt;
|
||||||
use overwatch_rs::services::handle::ServiceStateHandle;
|
use overwatch_rs::services::handle::ServiceStateHandle;
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
// internal
|
// internal
|
||||||
use backends::StorageBackend;
|
use backends::StorageBackend;
|
||||||
use backends::{StorageSerde, StorageTransaction};
|
use backends::{StorageSerde, StorageTransaction};
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
use overwatch_rs::services::relay::RelayMessage;
|
use overwatch_rs::services::relay::RelayMessage;
|
||||||
use overwatch_rs::services::state::{NoOperator, NoState};
|
use overwatch_rs::services::state::{NoOperator, NoState};
|
||||||
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
/// Storage message that maps to [`StorageBackend`] trait
|
/// Storage message that maps to [`StorageBackend`] trait
|
||||||
pub enum StorageMsg<Backend: StorageBackend> {
|
pub enum StorageMsg<Backend: StorageBackend> {
|
||||||
|
@ -162,6 +165,39 @@ pub struct StorageService<Backend: StorageBackend + Send + Sync + 'static> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
|
impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
|
||||||
|
async fn should_stop_service(msg: LifecycleMessage) -> bool {
|
||||||
|
match msg {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
// TODO: Try to finish pending transactions if any and close connections properly
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn handle_storage_message(msg: StorageMsg<Backend>, backend: &mut Backend) {
|
||||||
|
if let Err(e) = match msg {
|
||||||
|
StorageMsg::Load { key, reply_channel } => {
|
||||||
|
Self::handle_load(backend, key, reply_channel).await
|
||||||
|
}
|
||||||
|
StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await,
|
||||||
|
StorageMsg::Remove { key, reply_channel } => {
|
||||||
|
Self::handle_remove(backend, key, reply_channel).await
|
||||||
|
}
|
||||||
|
StorageMsg::Execute {
|
||||||
|
transaction,
|
||||||
|
reply_channel,
|
||||||
|
} => Self::handle_execute(backend, transaction, reply_channel).await,
|
||||||
|
} {
|
||||||
|
// TODO: add proper logging
|
||||||
|
println!("{e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
/// Handle load message
|
/// Handle load message
|
||||||
async fn handle_load(
|
async fn handle_load(
|
||||||
backend: &mut Backend,
|
backend: &mut Backend,
|
||||||
|
@ -243,27 +279,25 @@ impl<Backend: StorageBackend + Send + Sync + 'static> ServiceCore for StorageSer
|
||||||
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||||
let Self {
|
let Self {
|
||||||
mut backend,
|
mut backend,
|
||||||
service_state: ServiceStateHandle {
|
service_state:
|
||||||
mut inbound_relay, ..
|
ServiceStateHandle {
|
||||||
},
|
mut inbound_relay,
|
||||||
|
lifecycle_handle,
|
||||||
|
..
|
||||||
|
},
|
||||||
} = self;
|
} = self;
|
||||||
|
let mut lifecycle_stream = lifecycle_handle.message_stream();
|
||||||
let backend = &mut backend;
|
let backend = &mut backend;
|
||||||
while let Some(msg) = inbound_relay.recv().await {
|
loop {
|
||||||
if let Err(e) = match msg {
|
tokio::select! {
|
||||||
StorageMsg::Load { key, reply_channel } => {
|
Some(msg) = inbound_relay.recv() => {
|
||||||
Self::handle_load(backend, key, reply_channel).await
|
Self::handle_storage_message(msg, backend).await;
|
||||||
}
|
}
|
||||||
StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await,
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
StorageMsg::Remove { key, reply_channel } => {
|
if Self::should_stop_service(msg).await {
|
||||||
Self::handle_remove(backend, key, reply_channel).await
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
StorageMsg::Execute {
|
|
||||||
transaction,
|
|
||||||
reply_channel,
|
|
||||||
} => Self::handle_execute(backend, transaction, reply_channel).await,
|
|
||||||
} {
|
|
||||||
// TODO: add proper logging
|
|
||||||
println!("{e}");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -10,7 +10,7 @@ nomos-consensus = { path = "../nomos-services/consensus" }
|
||||||
nomos-network = { path = "../nomos-services/network", features = ["libp2p"]}
|
nomos-network = { path = "../nomos-services/network", features = ["libp2p"]}
|
||||||
nomos-log = { path = "../nomos-services/log" }
|
nomos-log = { path = "../nomos-services/log" }
|
||||||
nomos-http = { path = "../nomos-services/http", features = ["http"] }
|
nomos-http = { path = "../nomos-services/http", features = ["http"] }
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "6e6678b" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
nomos-core = { path = "../nomos-core" }
|
nomos-core = { path = "../nomos-core" }
|
||||||
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
|
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
|
||||||
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] }
|
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock", "libp2p"] }
|
||||||
|
|
Loading…
Reference in New Issue