From 8c1a5214403f4b6f526028756299d8895c05786a Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 12 Dec 2022 18:06:15 +0800 Subject: [PATCH] adapt to overwatch error handle (#24) --- nomos-services/log/src/lib.rs | 10 ++--- nomos-services/network/Cargo.toml | 4 +- nomos-services/network/src/backends/waku.rs | 2 +- nomos-services/network/src/lib.rs | 16 ++++---- nomos-services/storage/src/backends/mock.rs | 14 +++++-- nomos-services/storage/src/backends/mod.rs | 6 +-- nomos-services/storage/src/backends/sled.rs | 41 ++++++++++++++------- nomos-services/storage/src/lib.rs | 11 +++--- 8 files changed, 63 insertions(+), 41 deletions(-) diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index e369c79c..7ba7583d 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -68,7 +68,7 @@ macro_rules! registry_init { #[async_trait::async_trait] impl ServiceCore for Logger { - fn init(mut service_state: ServiceStateHandle) -> Self { + fn init(service_state: ServiceStateHandle) -> Result { let config = service_state.settings_reader.get_updated_settings(); let (non_blocking, _guard) = match config.backend { LoggerBackend::Gelf { addr } => { @@ -78,7 +78,7 @@ impl ServiceCore for Logger { .runtime() .spawn(async move { task.connect().await }); registry_init!(layer, config.format, config.level); - return Self(None); + return Ok(Self(None)); } LoggerBackend::File { directory, prefix } => { let file_appender = tracing_appender::rolling::hourly( @@ -95,12 +95,12 @@ impl ServiceCore for Logger { .with_level(true) .with_writer(non_blocking); registry_init!(layer, config.format, config.level); - Self(Some(_guard)) + Ok(Self(Some(_guard))) } - async fn run(self) { + async fn run(self) -> Result<(), overwatch_rs::DynError> { // keep the handle alive without stressing the runtime - futures::pending!() + Ok(futures::pending!()) } } diff --git a/nomos-services/network/Cargo.toml b/nomos-services/network/Cargo.toml index 37266d9e..2a72213c 100644 --- a/nomos-services/network/Cargo.toml +++ b/nomos-services/network/Cargo.toml @@ -15,11 +15,11 @@ sled = { version = "0.34", optional = true } tokio = { version = "1", features = ["sync"] } thiserror = "1.0" tracing = "0.1" -waku = { git = "https://github.com/waku-org/waku-rust-bindings", optional = true } +waku-bindings = { version = "0.1.0-beta1", optional = true } tracing-appender = "0.2" tracing-subscriber = { version = "0.3", features = ["json"] } tracing-gelf = "0.7" futures = "0.3" [features] -default = ["waku"] \ No newline at end of file +default = ["waku-bindings"] \ No newline at end of file diff --git a/nomos-services/network/src/backends/waku.rs b/nomos-services/network/src/backends/waku.rs index ab3cac79..5f31d555 100644 --- a/nomos-services/network/src/backends/waku.rs +++ b/nomos-services/network/src/backends/waku.rs @@ -1,5 +1,5 @@ use super::*; -use ::waku::*; +use ::waku_bindings::*; use overwatch_rs::services::state::NoState; use serde::{Deserialize, Serialize}; use tokio::sync::{ diff --git a/nomos-services/network/src/lib.rs b/nomos-services/network/src/lib.rs index f4d304a1..c6f5c6fd 100644 --- a/nomos-services/network/src/lib.rs +++ b/nomos-services/network/src/lib.rs @@ -66,16 +66,16 @@ impl ServiceData for NetworkService { #[async_trait] impl ServiceCore for NetworkService { - fn init(mut service_state: ServiceStateHandle) -> Self { - Self { + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { backend: ::new( service_state.settings_reader.get_updated_settings().backend, ), service_state, - } + }) } - async fn run(mut self) { + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { service_state: ServiceStateHandle { mut inbound_relay, .. @@ -100,6 +100,7 @@ impl ServiceCore for NetworkService { }), } } + Ok(()) } } @@ -121,10 +122,9 @@ impl Clone for NetworkState { impl ServiceState for NetworkState { type Settings = NetworkConfig; + type Error = ::Error; - fn from_settings(settings: &Self::Settings) -> Self { - Self { - _backend: B::State::from_settings(&settings.backend), - } + fn from_settings(settings: &Self::Settings) -> Result { + B::State::from_settings(&settings.backend).map(|_backend| Self { _backend }) } } diff --git a/nomos-services/storage/src/backends/mock.rs b/nomos-services/storage/src/backends/mock.rs index 7c76feb5..5d142d56 100644 --- a/nomos-services/storage/src/backends/mock.rs +++ b/nomos-services/storage/src/backends/mock.rs @@ -25,6 +25,12 @@ pub struct MockStorage { _serde_op: PhantomData, } +impl core::fmt::Debug for MockStorage { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + format!("MockStorage {{ inner: {:?} }}", self.inner).fmt(f) + } +} + #[async_trait] impl StorageBackend for MockStorage { type Settings = (); @@ -32,11 +38,11 @@ impl StorageBackend for MockStora type Transaction = MockStorageTransaction; type SerdeOperator = SerdeOp; - fn new(_config: Self::Settings) -> Self { - Self { + fn new(_config: Self::Settings) -> Result { + Ok(Self { inner: HashMap::new(), _serde_op: Default::default(), - } + }) } async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { @@ -45,7 +51,7 @@ impl StorageBackend for MockStora } async fn load(&mut self, key: &[u8]) -> Result, Self::Error> { - Ok(self.inner.get(key).map(|b| b.clone())) + Ok(self.inner.get(key).cloned()) } async fn remove(&mut self, key: &[u8]) -> Result, Self::Error> { diff --git a/nomos-services/storage/src/backends/mod.rs b/nomos-services/storage/src/backends/mod.rs index ce19fdce..4cc92bc5 100644 --- a/nomos-services/storage/src/backends/mod.rs +++ b/nomos-services/storage/src/backends/mod.rs @@ -28,18 +28,18 @@ pub trait StorageTransaction: Send + Sync { /// Main storage functionality trait #[async_trait] -pub trait StorageBackend { +pub trait StorageBackend: Sized { /// Backend settings type Settings: Clone + Send + Sync + 'static; /// Backend operations error type - type Error: Error + 'static; + type Error: Error + 'static + Send + Sync; /// Backend transaction type /// Usually it will be some function that modifies the storage directly or operates /// over the backend as per the backend specification. type Transaction: StorageTransaction; /// Operator to dump/load custom types into the defined backend store type [`Bytes`] type SerdeOperator: StorageSerde + Send + Sync + 'static; - fn new(config: Self::Settings) -> Self; + fn new(config: Self::Settings) -> Result; async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error>; async fn load(&mut self, key: &[u8]) -> Result, Self::Error>; async fn remove(&mut self, key: &[u8]) -> Result, Self::Error>; diff --git a/nomos-services/storage/src/backends/sled.rs b/nomos-services/storage/src/backends/sled.rs index 278434c1..29d321ae 100644 --- a/nomos-services/storage/src/backends/sled.rs +++ b/nomos-services/storage/src/backends/sled.rs @@ -10,6 +10,14 @@ use sled::transaction::{ // internal use super::{StorageBackend, StorageSerde, StorageTransaction}; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Transaction(#[from] TransactionError), + #[error(transparent)] + Error(#[from] sled::Error), +} + /// Sled backend setting #[derive(Clone)] pub struct SledBackendSettings { @@ -32,25 +40,30 @@ impl StorageTransaction for SledTransaction { } /// Sled storage backend + pub struct SledBackend { sled: sled::Db, _serde_op: PhantomData, } +impl core::fmt::Debug for SledBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + format!("SledBackend {{ sled: {:?} }}", self.sled).fmt(f) + } +} + #[async_trait] impl StorageBackend for SledBackend { type Settings = SledBackendSettings; - type Error = TransactionError; + type Error = Error; type Transaction = SledTransaction; type SerdeOperator = SerdeOp; - fn new(config: Self::Settings) -> Self { - Self { - sled: sled::open(config.db_path) - // TODO: We should probably make initialization failable - .unwrap(), + fn new(config: Self::Settings) -> Result { + Ok(Self { + sled: sled::open(config.db_path)?, _serde_op: Default::default(), - } + }) } async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { @@ -81,7 +94,8 @@ mod test { use tempfile::TempDir; #[tokio::test] - async fn test_store_load_remove() -> Result<(), TransactionError> { + async fn test_store_load_remove( + ) -> Result<(), as StorageBackend>::Error> { let temp_path = TempDir::new().unwrap(); let sled_settings = SledBackendSettings { db_path: temp_path.path().to_path_buf(), @@ -89,7 +103,7 @@ mod test { let key = "foo"; let value = "bar"; - let mut sled_db: SledBackend = SledBackend::new(sled_settings); + let mut sled_db: SledBackend = SledBackend::new(sled_settings)?; sled_db .store(key.as_bytes().into(), value.as_bytes().into()) .await?; @@ -102,7 +116,8 @@ mod test { } #[tokio::test] - async fn test_transaction() -> Result<(), TransactionError> { + async fn test_transaction() -> Result<(), as StorageBackend>::Error> + { let temp_path = TempDir::new().unwrap(); let sled_settings = SledBackendSettings { @@ -111,11 +126,11 @@ mod test { let key = "foo"; let value = "bar"; - let mut sled_db: SledBackend = SledBackend::new(sled_settings); + let mut sled_db: SledBackend = SledBackend::new(sled_settings)?; let result = sled_db .execute(Box::new(move |tx| { - let key = key.clone(); - let value = value.clone(); + let key = key; + let value = value; tx.insert(key, value)?; let result = tx.get(key)?; tx.remove(key)?; diff --git a/nomos-services/storage/src/lib.rs b/nomos-services/storage/src/lib.rs index c3ac3893..5888e427 100644 --- a/nomos-services/storage/src/lib.rs +++ b/nomos-services/storage/src/lib.rs @@ -233,14 +233,14 @@ impl StorageService { #[async_trait] impl ServiceCore for StorageService { - fn init(mut service_state: ServiceStateHandle) -> Self { - Self { - backend: Backend::new(service_state.settings_reader.get_updated_settings()), + fn init(service_state: ServiceStateHandle) -> Result { + Ok(Self { + backend: Backend::new(service_state.settings_reader.get_updated_settings())?, service_state, - } + }) } - async fn run(mut self) { + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { let Self { mut backend, service_state: ServiceStateHandle { @@ -266,6 +266,7 @@ impl ServiceCore for StorageSer println!("{e}"); } } + Ok(()) } }