update overwatch version

This commit is contained in:
Al Liu 2023-11-08 16:38:22 +08:00
parent f6ea2fa59f
commit 90165084ea
No known key found for this signature in database
GPG Key ID: C8AE9A6E0166923E
15 changed files with 86 additions and 66 deletions

View File

@ -2,8 +2,7 @@ use mixnet_node::{MixnetNode, MixnetNodeConfig};
use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::NoMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
pub struct MixnetNodeService(MixnetNode);
@ -17,12 +16,12 @@ impl ServiceData for MixnetNodeService {
#[async_trait::async_trait]
impl ServiceCore for MixnetNodeService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let settings: Self::Settings = service_state.settings_reader.get_updated_settings();
Ok(Self(MixnetNode::new(settings)))
}
async fn run(self) -> Result<(), DynError> {
async fn run(self) -> Result<(), ServiceError> {
if let Err(_e) = self.0.run().await {
todo!("Errors should match");
}

View File

@ -51,7 +51,7 @@ use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
use overwatch_rs::services::{
handle::ServiceStateHandle,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
@ -202,7 +202,7 @@ where
BS::Settings: Send + Sync + 'static,
Storage: StorageBackend + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let network_relay = service_state.overwatch_handle.relay();
let cl_mempool_relay = service_state.overwatch_handle.relay();
let da_mempool_relay = service_state.overwatch_handle.relay();
@ -217,7 +217,7 @@ where
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let network_relay: OutboundRelay<_> = self
.network_relay
.connect()

View File

@ -2,7 +2,6 @@ pub mod backend;
pub mod network;
// std
use overwatch_rs::DynError;
use std::fmt::{Debug, Formatter};
// crates
use futures::StreamExt;
@ -17,7 +16,7 @@ use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use tracing::error;
pub struct DataAvailabilityService<Protocol, Backend, Network>
@ -155,7 +154,7 @@ where
Network:
NetworkAdapter<Blob = Protocol::Blob, Attestation = Protocol::Attestation> + Send + Sync,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
let backend = Backend::new(settings.backend);
@ -168,7 +167,7 @@ where
})
}
async fn run(self) -> Result<(), DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self {
mut service_state,
backend,

View File

@ -12,7 +12,7 @@ use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
};
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use parking_lot::Mutex;
@ -40,14 +40,14 @@ impl ServiceData for DummyService {
#[async_trait::async_trait]
impl ServiceCore for DummyService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self {
counter: Default::default(),
service_state,
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self {
counter,
service_state: ServiceStateHandle {

View File

@ -13,7 +13,7 @@ use overwatch_rs::services::relay::{OutboundRelay, RelayMessage};
use overwatch_rs::services::{
handle::ServiceStateHandle,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
};
use overwatch_rs::{overwatch::OverwatchRunner, services::handle::ServiceHandle};
use parking_lot::Mutex;
@ -66,7 +66,7 @@ where
let res = match handle_graphql_req(&SCHEMA, payload, dummy.clone()).await {
Ok(r) => r,
Err(err) => {
tracing::error!(err);
tracing::error!(%err);
err.to_string()
}
};
@ -85,7 +85,7 @@ async fn handle_graphql_req(
>,
payload: Option<Bytes>,
dummy: OutboundRelay<DummyGraphqlMsg>,
) -> Result<String, overwatch_rs::DynError> {
) -> Result<String, Box<dyn Error + Send + Sync + 'static>> {
// TODO: Move to the graphql frontend as a helper function?
let payload = payload.ok_or("empty payload")?;
let req = async_graphql::http::receive_batch_json(&payload[..])
@ -142,14 +142,14 @@ impl ServiceData for DummyGraphqlService {
#[async_trait::async_trait]
impl ServiceCore for DummyGraphqlService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self {
service_state,
val: Arc::new(Mutex::new(0)),
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self {
service_state: ServiceStateHandle {
mut inbound_relay, ..
@ -179,7 +179,7 @@ pub struct Args {
http: AxumBackendSettings,
}
fn main() -> Result<(), overwatch_rs::DynError> {
fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
tracing_subscriber::fmt::fmt().with_file(false).init();
let settings = Args::parse();

View File

@ -8,16 +8,15 @@ use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::relay::{NoMessage, OutboundRelay};
use overwatch_rs::services::{
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
};
use overwatch_rs::DynError;
use tokio::sync::mpsc::{channel, Receiver};
use crate::backends::HttpBackend;
use crate::http::{HttpMethod, HttpMsg, HttpRequest, HttpService};
pub type HttpBridgeRunner =
Box<dyn Future<Output = Result<(), overwatch_rs::DynError>> + Send + Unpin + 'static>;
Box<dyn Future<Output = Result<(), ServiceError>> + Send + Unpin + 'static>;
// TODO: If we can get rid of the clone bound on here remove Arc.
// For now as we bind this through the settings we need to keep it.
@ -35,7 +34,7 @@ pub async fn build_http_bridge<S, B, P>(
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
method: HttpMethod,
path: P,
) -> Result<(OutboundRelay<S::Message>, Receiver<HttpRequest>), overwatch_rs::DynError>
) -> Result<(OutboundRelay<S::Message>, Receiver<HttpRequest>), ServiceError>
where
S: ServiceCore + Send + Sync + 'static,
B: HttpBackend + Send + Sync + 'static,
@ -89,7 +88,7 @@ impl ServiceData for HttpBridgeService {
#[async_trait]
impl ServiceCore for HttpBridgeService {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let runners = service_state.settings_reader.get_updated_settings().bridges;
let runners: Vec<_> = runners
.into_iter()
@ -98,7 +97,7 @@ impl ServiceCore for HttpBridgeService {
Ok(Self { runners })
}
async fn run(self) -> Result<(), DynError> {
async fn run(self) -> Result<(), ServiceError> {
futures::future::join_all(self.runners).await;
Ok(())
}

View File

@ -14,7 +14,7 @@ use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{InboundRelay, RelayMessage},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc::Sender, oneshot};
@ -124,17 +124,17 @@ where
B: HttpBackend + Send + Sync + 'static,
<B as HttpBackend>::Error: Error + Send + Sync + 'static,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let inbound_relay = service_state.inbound_relay;
<B as HttpBackend>::new(service_state.settings_reader.get_updated_settings().backend)
.map(|backend| Self {
backend,
inbound_relay,
})
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
.map_err(|e| ServiceError::Service(Box::new(e)))
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
backend,
mut inbound_relay,
@ -169,7 +169,7 @@ where
if stop_tx.send(()).is_err() {
tracing::error!("HTTP service: failed to send stop signal to HTTP backend.");
}
e
ServiceError::Service(e)
})
}
}
@ -188,25 +188,44 @@ pub async fn handle_graphql_req<M, F>(
payload: Option<Bytes>,
relay: OutboundRelay<M>,
f: F,
) -> Result<String, overwatch_rs::DynError>
) -> Result<String, ServiceError>
where
F: FnOnce(
async_graphql::Request,
oneshot::Sender<async_graphql::Response>,
) -> Result<M, overwatch_rs::DynError>,
) -> Result<M, ServiceError>,
{
let payload = payload.ok_or("empty payload")?;
#[derive(Debug)]
struct EmptyPayload;
impl core::fmt::Display for EmptyPayload {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "empty payload")
}
}
impl Error for EmptyPayload {}
let payload = payload
.ok_or(EmptyPayload)
.map_err(|e| ServiceError::Service(Box::new(e)))?;
let req = async_graphql::http::receive_batch_json(&payload[..])
.await?
.into_single()?;
.await
.map_err(|e| ServiceError::Service(Box::new(e)))?
.into_single()
.map_err(|e| ServiceError::Service(Box::new(e)))?;
let (sender, receiver) = oneshot::channel();
relay.send(f(req, sender)?).await.map_err(|_| {
tracing::error!(err = "failed to send graphql request to the http service");
"failed to send graphql request to the frontend"
})?;
relay
.send(f(req, sender)?)
.await
.map_err(|e| {
tracing::error!(err = "failed to send graphql request to the http service");
e
})
.map_err(|e| ServiceError::RelayError(e.0))?;
let res = receiver.await.unwrap();
let res = serde_json::to_string(&res)?;
let res = serde_json::to_string(&res).map_err(|e| ServiceError::Service(Box::new(e)))?;
Ok(res)
}

View File

@ -1,5 +1,6 @@
// std
use futures::StreamExt;
use overwatch_rs::services::ServiceError;
use std::fmt::{Debug, Formatter};
use std::io::Write;
use std::net::SocketAddr;
@ -133,7 +134,7 @@ macro_rules! registry_init {
#[async_trait::async_trait]
impl ServiceCore for Logger {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let config = service_state.settings_reader.get_updated_settings();
let (non_blocking, _guard) = match config.backend {
LoggerBackend::Gelf { addr } => {
@ -170,7 +171,7 @@ impl ServiceCore for Logger {
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self {
service_state,
worker_guard,

View File

@ -21,13 +21,13 @@ use crate::network::NetworkAdapter;
use backend::{MemPool, Status};
use nomos_core::block::BlockId;
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
handle::ServiceStateHandle,
relay::{OutboundRelay, Relay, RelayMessage},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
};
use overwatch_rs::services::{life_cycle::LifecycleMessage, ServiceError};
use tracing::error;
pub struct MempoolService<N, P, D>
@ -159,7 +159,7 @@ where
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
D: Discriminant + Send,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let network_relay = service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
Ok(Self {
@ -170,7 +170,7 @@ where
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
mut service_state,
network_relay,

View File

@ -14,7 +14,7 @@ use overwatch_rs::{
handle::{ServiceHandle, ServiceStateHandle},
relay::{NoMessage, Relay},
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
},
};
@ -67,13 +67,13 @@ impl<Backend: MetricsBackend<MetricsData = MetricsData> + Clone + Send + Sync +
where
Backend::MetricsData: async_graphql::OutputType,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let backend_channel: Relay<MetricsService<Backend>> =
service_state.overwatch_handle.relay();
Ok(Self { backend_channel })
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
async fn run(self) -> Result<(), ServiceError> {
let replay = self.backend_channel.connect().await.map_err(|e| {
tracing::error!(err = ?e, "Metrics Updater: relay connect error");
e

View File

@ -10,7 +10,7 @@ use overwatch_rs::services::relay::{Relay, RelayMessage};
use overwatch_rs::services::{
handle::ServiceStateHandle,
state::{NoOperator, NoState},
ServiceCore, ServiceData, ServiceId,
ServiceCore, ServiceData, ServiceError, ServiceId,
};
/// Configuration for the GraphQl Server
@ -68,7 +68,7 @@ where
{
Ok(r) => r,
Err(err) => {
tracing::error!(err);
tracing::error!(%err);
err.to_string()
}
};
@ -148,7 +148,7 @@ impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for Graphql<Ba
where
Backend::MetricsData: async_graphql::OutputType,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let backend_channel: Relay<MetricsService<Backend>> =
service_state.overwatch_handle.relay();
let settings = service_state.settings_reader.get_updated_settings();
@ -159,7 +159,7 @@ where
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let max_complexity = self.settings.max_complexity;
let max_depth = self.settings.max_depth;
let mut inbound_relay = self.service_state.take().unwrap().inbound_relay;

View File

@ -3,6 +3,7 @@ use std::fmt::Debug;
// crates
use futures::StreamExt;
use overwatch_rs::services::ServiceError;
use tracing::error;
// internal
@ -168,7 +169,7 @@ impl<Backend: MetricsBackend> MetricsService<Backend> {
#[async_trait::async_trait]
impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for MetricsService<Backend> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
let settings = service_state.settings_reader.get_updated_settings();
let backend = Backend::init(settings);
Ok(Self {
@ -177,7 +178,7 @@ impl<Backend: MetricsBackend + Send + Sync + 'static> ServiceCore for MetricsSer
})
}
async fn run(self) -> Result<(), overwatch_rs::DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self {
service_state:
ServiceStateHandle {

View File

@ -5,6 +5,7 @@ use std::fmt::{self, Debug};
// crates
use async_trait::async_trait;
use futures::StreamExt;
use overwatch_rs::services::ServiceError;
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::oneshot;
@ -75,7 +76,7 @@ where
B: NetworkBackend + Send + 'static,
B::State: Send + Sync,
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self {
backend: <B as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings().backend,
@ -85,7 +86,7 @@ where
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
service_state:
ServiceStateHandle {

View File

@ -16,7 +16,7 @@ use backends::{StorageSerde, StorageTransaction};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::RelayMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
use tracing::error;
/// Storage message that maps to [`StorageBackend`] trait
@ -273,14 +273,15 @@ impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
#[async_trait]
impl<Backend: StorageBackend + Send + Sync + 'static> ServiceCore for StorageService<Backend> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self {
backend: Backend::new(service_state.settings_reader.get_updated_settings())?,
backend: Backend::new(service_state.settings_reader.get_updated_settings())
.map_err(|e| ServiceError::Service(Box::new(e)))?,
service_state,
})
}
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
async fn run(mut self) -> Result<(), ServiceError> {
let Self {
mut backend,
service_state:

View File

@ -9,8 +9,7 @@ use overwatch_rs::services::handle::ServiceStateHandle;
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::relay::NoMessage;
use overwatch_rs::services::state::{NoOperator, NoState};
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_rs::DynError;
use overwatch_rs::services::{ServiceCore, ServiceData, ServiceError, ServiceId};
pub struct SystemSig {
service_state: ServiceStateHandle<Self>,
@ -48,13 +47,14 @@ impl ServiceData for SystemSig {
#[async_trait::async_trait]
impl ServiceCore for SystemSig {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, DynError> {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, ServiceError> {
Ok(Self { service_state })
}
async fn run(self) -> Result<(), DynError> {
async fn run(self) -> Result<(), ServiceError> {
let Self { service_state } = self;
let mut ctrlc = async_ctrlc::CtrlC::new()?;
let mut ctrlc =
async_ctrlc::CtrlC::new().map_err(|e| ServiceError::Service(Box::new(e)))?;
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
tokio::select! {