From 90165084ea6dfac70d15cfbf46fd2d922ca89046 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Wed, 8 Nov 2023 16:38:22 +0800 Subject: [PATCH] update overwatch version --- nodes/mixnode/src/services/mixnet.rs | 7 ++- nomos-services/consensus/src/lib.rs | 6 +-- nomos-services/data-availability/src/lib.rs | 7 ++- nomos-services/http/examples/axum.rs | 6 +-- nomos-services/http/examples/graphql.rs | 12 ++--- nomos-services/http/src/bridge.rs | 11 ++--- nomos-services/http/src/http.rs | 49 +++++++++++++------ nomos-services/log/src/lib.rs | 5 +- nomos-services/mempool/src/lib.rs | 6 +-- nomos-services/metrics/examples/graphql.rs | 6 +-- .../metrics/src/frontend/graphql/mod.rs | 8 +-- nomos-services/metrics/src/lib.rs | 5 +- nomos-services/network/src/lib.rs | 5 +- nomos-services/storage/src/lib.rs | 9 ++-- nomos-services/system-sig/src/lib.rs | 10 ++-- 15 files changed, 86 insertions(+), 66 deletions(-) diff --git a/nodes/mixnode/src/services/mixnet.rs b/nodes/mixnode/src/services/mixnet.rs index cc7bdc42..d45218f3 100644 --- a/nodes/mixnode/src/services/mixnet.rs +++ b/nodes/mixnode/src/services/mixnet.rs @@ -2,8 +2,7 @@ use mixnet_node::{MixnetNode, MixnetNodeConfig}; use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::relay::NoMessage; use overwatch_rs::services::state::{NoOperator, NoState}; -use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; -use overwatch_rs::DynError; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId}; pub struct MixnetNodeService(MixnetNode); @@ -17,12 +16,12 @@ impl ServiceData for MixnetNodeService { #[async_trait::async_trait] impl ServiceCore for MixnetNodeService { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let settings: Self::Settings = service_state.settings_reader.get_updated_settings(); Ok(Self(MixnetNode::new(settings))) } - async fn run(self) -> Result<(), DynError> { + async fn run(self) -> Result<(), ServiceError> { if let Err(_e) = self.0.run().await { todo!("Errors should match"); } diff --git a/nomos-services/consensus/src/lib.rs b/nomos-services/consensus/src/lib.rs index 26627c4f..3db54a05 100644 --- a/nomos-services/consensus/src/lib.rs +++ b/nomos-services/consensus/src/lib.rs @@ -51,7 +51,7 @@ use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); @@ -202,7 +202,7 @@ where BS::Settings: Send + Sync + 'static, Storage: StorageBackend + Send + Sync + 'static, { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay(); let da_mempool_relay = service_state.overwatch_handle.relay(); @@ -217,7 +217,7 @@ where }) } - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + async fn run(mut self) -> Result<(), ServiceError> { let network_relay: OutboundRelay<_> = self .network_relay .connect() diff --git a/nomos-services/data-availability/src/lib.rs b/nomos-services/data-availability/src/lib.rs index edffb8fb..52be24e2 100644 --- a/nomos-services/data-availability/src/lib.rs +++ b/nomos-services/data-availability/src/lib.rs @@ -2,7 +2,6 @@ pub mod backend; pub mod network; // std -use overwatch_rs::DynError; use std::fmt::{Debug, Formatter}; // crates use futures::StreamExt; @@ -17,7 +16,7 @@ use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::state::{NoOperator, NoState}; -use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId}; use tracing::error; pub struct DataAvailabilityService @@ -155,7 +154,7 @@ where Network: NetworkAdapter + Send + Sync, { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); let settings = service_state.settings_reader.get_updated_settings(); let backend = Backend::new(settings.backend); @@ -168,7 +167,7 @@ where }) } - async fn run(self) -> Result<(), DynError> { + async fn run(self) -> Result<(), ServiceError> { let Self { mut service_state, backend, diff --git a/nomos-services/http/examples/axum.rs b/nomos-services/http/examples/axum.rs index 82583577..ceb92f32 100644 --- a/nomos-services/http/examples/axum.rs +++ b/nomos-services/http/examples/axum.rs @@ -12,7 +12,7 @@ use overwatch_rs::services::relay::RelayMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }; use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; use parking_lot::Mutex; @@ -40,14 +40,14 @@ impl ServiceData for DummyService { #[async_trait::async_trait] impl ServiceCore for DummyService { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { Ok(Self { counter: Default::default(), service_state, }) } - async fn run(self) -> Result<(), overwatch_rs::DynError> { + async fn run(self) -> Result<(), ServiceError> { let Self { counter, service_state: ServiceStateHandle { diff --git a/nomos-services/http/examples/graphql.rs b/nomos-services/http/examples/graphql.rs index 9b9aca26..0db8cc96 100644 --- a/nomos-services/http/examples/graphql.rs +++ b/nomos-services/http/examples/graphql.rs @@ -13,7 +13,7 @@ use overwatch_rs::services::relay::{OutboundRelay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }; use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle}; use parking_lot::Mutex; @@ -66,7 +66,7 @@ where let res = match handle_graphql_req(&SCHEMA, payload, dummy.clone()).await { Ok(r) => r, Err(err) => { - tracing::error!(err); + tracing::error!(%err); err.to_string() } }; @@ -85,7 +85,7 @@ async fn handle_graphql_req( >, payload: Option, dummy: OutboundRelay, -) -> Result { +) -> Result> { // TODO: Move to the graphql frontend as a helper function? let payload = payload.ok_or("empty payload")?; let req = async_graphql::http::receive_batch_json(&payload[..]) @@ -142,14 +142,14 @@ impl ServiceData for DummyGraphqlService { #[async_trait::async_trait] impl ServiceCore for DummyGraphqlService { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { Ok(Self { service_state, val: Arc::new(Mutex::new(0)), }) } - async fn run(self) -> Result<(), overwatch_rs::DynError> { + async fn run(self) -> Result<(), ServiceError> { let Self { service_state: ServiceStateHandle { mut inbound_relay, .. @@ -179,7 +179,7 @@ pub struct Args { http: AxumBackendSettings, } -fn main() -> Result<(), overwatch_rs::DynError> { +fn main() -> Result<(), Box> { tracing_subscriber::fmt::fmt().with_file(false).init(); let settings = Args::parse(); diff --git a/nomos-services/http/src/bridge.rs b/nomos-services/http/src/bridge.rs index 04bc0ad4..ed092d88 100644 --- a/nomos-services/http/src/bridge.rs +++ b/nomos-services/http/src/bridge.rs @@ -8,16 +8,15 @@ use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::relay::{NoMessage, OutboundRelay}; use overwatch_rs::services::{ state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }; -use overwatch_rs::DynError; use tokio::sync::mpsc::{channel, Receiver}; use crate::backends::HttpBackend; use crate::http::{HttpMethod, HttpMsg, HttpRequest, HttpService}; pub type HttpBridgeRunner = - Box> + Send + Unpin + 'static>; + Box> + Send + Unpin + 'static>; // TODO: If we can get rid of the clone bound on here remove Arc. // For now as we bind this through the settings we need to keep it. @@ -35,7 +34,7 @@ pub async fn build_http_bridge( handle: overwatch_rs::overwatch::handle::OverwatchHandle, method: HttpMethod, path: P, -) -> Result<(OutboundRelay, Receiver), overwatch_rs::DynError> +) -> Result<(OutboundRelay, Receiver), ServiceError> where S: ServiceCore + Send + Sync + 'static, B: HttpBackend + Send + Sync + 'static, @@ -89,7 +88,7 @@ impl ServiceData for HttpBridgeService { #[async_trait] impl ServiceCore for HttpBridgeService { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let runners = service_state.settings_reader.get_updated_settings().bridges; let runners: Vec<_> = runners .into_iter() @@ -98,7 +97,7 @@ impl ServiceCore for HttpBridgeService { Ok(Self { runners }) } - async fn run(self) -> Result<(), DynError> { + async fn run(self) -> Result<(), ServiceError> { futures::future::join_all(self.runners).await; Ok(()) } diff --git a/nomos-services/http/src/http.rs b/nomos-services/http/src/http.rs index 0740e412..fafa89fb 100644 --- a/nomos-services/http/src/http.rs +++ b/nomos-services/http/src/http.rs @@ -14,7 +14,7 @@ use overwatch_rs::services::{ handle::ServiceStateHandle, relay::{InboundRelay, RelayMessage}, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc::Sender, oneshot}; @@ -124,17 +124,17 @@ where B: HttpBackend + Send + Sync + 'static, ::Error: Error + Send + Sync + 'static, { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let inbound_relay = service_state.inbound_relay; ::new(service_state.settings_reader.get_updated_settings().backend) .map(|backend| Self { backend, inbound_relay, }) - .map_err(|e| Box::new(e) as Box) + .map_err(|e| ServiceError::Service(Box::new(e))) } - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + async fn run(mut self) -> Result<(), ServiceError> { let Self { backend, mut inbound_relay, @@ -169,7 +169,7 @@ where if stop_tx.send(()).is_err() { tracing::error!("HTTP service: failed to send stop signal to HTTP backend."); } - e + ServiceError::Service(e) }) } } @@ -188,25 +188,44 @@ pub async fn handle_graphql_req( payload: Option, relay: OutboundRelay, f: F, -) -> Result +) -> Result where F: FnOnce( async_graphql::Request, oneshot::Sender, - ) -> Result, + ) -> Result, { - let payload = payload.ok_or("empty payload")?; + #[derive(Debug)] + struct EmptyPayload; + + impl core::fmt::Display for EmptyPayload { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + write!(f, "empty payload") + } + } + + impl Error for EmptyPayload {} + + let payload = payload + .ok_or(EmptyPayload) + .map_err(|e| ServiceError::Service(Box::new(e)))?; let req = async_graphql::http::receive_batch_json(&payload[..]) - .await? - .into_single()?; + .await + .map_err(|e| ServiceError::Service(Box::new(e)))? + .into_single() + .map_err(|e| ServiceError::Service(Box::new(e)))?; let (sender, receiver) = oneshot::channel(); - relay.send(f(req, sender)?).await.map_err(|_| { - tracing::error!(err = "failed to send graphql request to the http service"); - "failed to send graphql request to the frontend" - })?; + relay + .send(f(req, sender)?) + .await + .map_err(|e| { + tracing::error!(err = "failed to send graphql request to the http service"); + e + }) + .map_err(|e| ServiceError::RelayError(e.0))?; let res = receiver.await.unwrap(); - let res = serde_json::to_string(&res)?; + let res = serde_json::to_string(&res).map_err(|e| ServiceError::Service(Box::new(e)))?; Ok(res) } diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index 8ccfa7fa..c407f46f 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -1,5 +1,6 @@ // std use futures::StreamExt; +use overwatch_rs::services::ServiceError; use std::fmt::{Debug, Formatter}; use std::io::Write; use std::net::SocketAddr; @@ -133,7 +134,7 @@ macro_rules! registry_init { #[async_trait::async_trait] impl ServiceCore for Logger { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let config = service_state.settings_reader.get_updated_settings(); let (non_blocking, _guard) = match config.backend { LoggerBackend::Gelf { addr } => { @@ -170,7 +171,7 @@ impl ServiceCore for Logger { }) } - async fn run(self) -> Result<(), overwatch_rs::DynError> { + async fn run(self) -> Result<(), ServiceError> { let Self { service_state, worker_guard, diff --git a/nomos-services/mempool/src/lib.rs b/nomos-services/mempool/src/lib.rs index ca887d8b..8a9a2a88 100644 --- a/nomos-services/mempool/src/lib.rs +++ b/nomos-services/mempool/src/lib.rs @@ -21,13 +21,13 @@ use crate::network::NetworkAdapter; use backend::{MemPool, Status}; use nomos_core::block::BlockId; use nomos_network::{NetworkMsg, NetworkService}; -use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::{ handle::ServiceStateHandle, relay::{OutboundRelay, Relay, RelayMessage}, state::{NoOperator, NoState}, ServiceCore, ServiceData, ServiceId, }; +use overwatch_rs::services::{life_cycle::LifecycleMessage, ServiceError}; use tracing::error; pub struct MempoolService @@ -159,7 +159,7 @@ where N: NetworkAdapter + Send + Sync + 'static, D: Discriminant + Send, { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); let settings = service_state.settings_reader.get_updated_settings(); Ok(Self { @@ -170,7 +170,7 @@ where }) } - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + async fn run(mut self) -> Result<(), ServiceError> { let Self { mut service_state, network_relay, diff --git a/nomos-services/metrics/examples/graphql.rs b/nomos-services/metrics/examples/graphql.rs index c5fe0c06..105ded6d 100644 --- a/nomos-services/metrics/examples/graphql.rs +++ b/nomos-services/metrics/examples/graphql.rs @@ -14,7 +14,7 @@ use overwatch_rs::{ handle::{ServiceHandle, ServiceStateHandle}, relay::{NoMessage, Relay}, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }, }; @@ -67,13 +67,13 @@ impl + Clone + Send + Sync + where Backend::MetricsData: async_graphql::OutputType, { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let backend_channel: Relay> = service_state.overwatch_handle.relay(); Ok(Self { backend_channel }) } - async fn run(self) -> Result<(), overwatch_rs::DynError> { + async fn run(self) -> Result<(), ServiceError> { let replay = self.backend_channel.connect().await.map_err(|e| { tracing::error!(err = ?e, "Metrics Updater: relay connect error"); e diff --git a/nomos-services/metrics/src/frontend/graphql/mod.rs b/nomos-services/metrics/src/frontend/graphql/mod.rs index 39ffd60f..4d564ca7 100644 --- a/nomos-services/metrics/src/frontend/graphql/mod.rs +++ b/nomos-services/metrics/src/frontend/graphql/mod.rs @@ -10,7 +10,7 @@ use overwatch_rs::services::relay::{Relay, RelayMessage}; use overwatch_rs::services::{ handle::ServiceStateHandle, state::{NoOperator, NoState}, - ServiceCore, ServiceData, ServiceId, + ServiceCore, ServiceData, ServiceError, ServiceId, }; /// Configuration for the GraphQl Server @@ -68,7 +68,7 @@ where { Ok(r) => r, Err(err) => { - tracing::error!(err); + tracing::error!(%err); err.to_string() } }; @@ -148,7 +148,7 @@ impl ServiceCore for Graphql) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let backend_channel: Relay> = service_state.overwatch_handle.relay(); let settings = service_state.settings_reader.get_updated_settings(); @@ -159,7 +159,7 @@ where }) } - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + async fn run(mut self) -> Result<(), ServiceError> { let max_complexity = self.settings.max_complexity; let max_depth = self.settings.max_depth; let mut inbound_relay = self.service_state.take().unwrap().inbound_relay; diff --git a/nomos-services/metrics/src/lib.rs b/nomos-services/metrics/src/lib.rs index bfa99564..96d89fc6 100644 --- a/nomos-services/metrics/src/lib.rs +++ b/nomos-services/metrics/src/lib.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; // crates use futures::StreamExt; +use overwatch_rs::services::ServiceError; use tracing::error; // internal @@ -168,7 +169,7 @@ impl MetricsService { #[async_trait::async_trait] impl ServiceCore for MetricsService { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { let settings = service_state.settings_reader.get_updated_settings(); let backend = Backend::init(settings); Ok(Self { @@ -177,7 +178,7 @@ impl ServiceCore for MetricsSer }) } - async fn run(self) -> Result<(), overwatch_rs::DynError> { + async fn run(self) -> Result<(), ServiceError> { let Self { service_state: ServiceStateHandle { diff --git a/nomos-services/network/src/lib.rs b/nomos-services/network/src/lib.rs index 6715424f..d099e47b 100644 --- a/nomos-services/network/src/lib.rs +++ b/nomos-services/network/src/lib.rs @@ -5,6 +5,7 @@ use std::fmt::{self, Debug}; // crates use async_trait::async_trait; use futures::StreamExt; +use overwatch_rs::services::ServiceError; use serde::{Deserialize, Serialize}; use tokio::sync::broadcast; use tokio::sync::oneshot; @@ -75,7 +76,7 @@ where B: NetworkBackend + Send + 'static, B::State: Send + Sync, { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { Ok(Self { backend: ::new( service_state.settings_reader.get_updated_settings().backend, @@ -85,7 +86,7 @@ where }) } - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + async fn run(mut self) -> Result<(), ServiceError> { let Self { service_state: ServiceStateHandle { diff --git a/nomos-services/storage/src/lib.rs b/nomos-services/storage/src/lib.rs index b2754f03..55a5ceb7 100644 --- a/nomos-services/storage/src/lib.rs +++ b/nomos-services/storage/src/lib.rs @@ -16,7 +16,7 @@ use backends::{StorageSerde, StorageTransaction}; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::RelayMessage; use overwatch_rs::services::state::{NoOperator, NoState}; -use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId}; use tracing::error; /// Storage message that maps to [`StorageBackend`] trait @@ -273,14 +273,15 @@ impl StorageService { #[async_trait] impl ServiceCore for StorageService { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { Ok(Self { - backend: Backend::new(service_state.settings_reader.get_updated_settings())?, + backend: Backend::new(service_state.settings_reader.get_updated_settings()) + .map_err(|e| ServiceError::Service(Box::new(e)))?, service_state, }) } - async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + async fn run(mut self) -> Result<(), ServiceError> { let Self { mut backend, service_state: diff --git a/nomos-services/system-sig/src/lib.rs b/nomos-services/system-sig/src/lib.rs index ff4d54b2..a7d9e3ef 100644 --- a/nomos-services/system-sig/src/lib.rs +++ b/nomos-services/system-sig/src/lib.rs @@ -9,8 +9,7 @@ use overwatch_rs::services::handle::ServiceStateHandle; use overwatch_rs::services::life_cycle::LifecycleMessage; use overwatch_rs::services::relay::NoMessage; use overwatch_rs::services::state::{NoOperator, NoState}; -use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId}; -use overwatch_rs::DynError; +use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId}; pub struct SystemSig { service_state: ServiceStateHandle, @@ -48,13 +47,14 @@ impl ServiceData for SystemSig { #[async_trait::async_trait] impl ServiceCore for SystemSig { - fn init(service_state: ServiceStateHandle) -> Result { + fn init(service_state: ServiceStateHandle) -> Result { Ok(Self { service_state }) } - async fn run(self) -> Result<(), DynError> { + async fn run(self) -> Result<(), ServiceError> { let Self { service_state } = self; - let mut ctrlc = async_ctrlc::CtrlC::new()?; + let mut ctrlc = + async_ctrlc::CtrlC::new().map_err(|e| ServiceError::Service(Box::new(e)))?; let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); loop { tokio::select! {