adapt to overwatch error handle (#24)

This commit is contained in:
Al Liu 2022-12-12 18:06:15 +08:00 committed by GitHub
parent ee0085e873
commit 8c1a521440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 63 additions and 41 deletions

View File

@ -68,7 +68,7 @@ macro_rules! registry_init {
#[async_trait::async_trait]
impl ServiceCore for Logger {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let config = service_state.settings_reader.get_updated_settings();
let (non_blocking, _guard) = match config.backend {
LoggerBackend::Gelf { addr } => {
@ -78,7 +78,7 @@ impl ServiceCore for Logger {
.runtime()
.spawn(async move { task.connect().await });
registry_init!(layer, config.format, config.level);
return Self(None);
return Ok(Self(None));
}
LoggerBackend::File { directory, prefix } => {
let file_appender = tracing_appender::rolling::hourly(
@ -95,12 +95,12 @@ impl ServiceCore for Logger {
.with_level(true)
.with_writer(non_blocking);
registry_init!(layer, config.format, config.level);
Self(Some(_guard))
Ok(Self(Some(_guard)))
}
async fn run(self) {
async fn run(self) -> Result<(), overwatch_rs::DynError> {
// keep the handle alive without stressing the runtime
futures::pending!()
Ok(futures::pending!())
}
}

View File

@ -15,11 +15,11 @@ sled = { version = "0.34", optional = true }
tokio = { version = "1", features = ["sync"] }
thiserror = "1.0"
tracing = "0.1"
waku = { git = "https://github.com/waku-org/waku-rust-bindings", optional = true }
waku-bindings = { version = "0.1.0-beta1", optional = true }
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["json"] }
tracing-gelf = "0.7"
futures = "0.3"
[features]
default = ["waku"]
default = ["waku-bindings"]

View File

@ -1,5 +1,5 @@
use super::*;
use ::waku::*;
use ::waku_bindings::*;
use overwatch_rs::services::state::NoState;
use serde::{Deserialize, Serialize};
use tokio::sync::{

View File

@ -66,16 +66,16 @@ impl<B: NetworkBackend + Send + 'static> ServiceData for NetworkService<B> {
#[async_trait]
impl<B: NetworkBackend + Send + 'static> ServiceCore for NetworkService<B> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
Ok(Self {
backend: <B as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings().backend,
),
service_state,
}
})
}
async fn run(mut self) {
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
service_state: ServiceStateHandle {
mut inbound_relay, ..
@ -100,6 +100,7 @@ impl<B: NetworkBackend + Send + 'static> ServiceCore for NetworkService<B> {
}),
}
}
Ok(())
}
}
@ -121,10 +122,9 @@ impl<B: NetworkBackend> Clone for NetworkState<B> {
impl<B: NetworkBackend + Send + 'static> ServiceState for NetworkState<B> {
type Settings = NetworkConfig<B>;
type Error = <B::State as ServiceState>::Error;
fn from_settings(settings: &Self::Settings) -> Self {
Self {
_backend: B::State::from_settings(&settings.backend),
}
fn from_settings(settings: &Self::Settings) -> Result<Self, Self::Error> {
B::State::from_settings(&settings.backend).map(|_backend| Self { _backend })
}
}

View File

@ -25,6 +25,12 @@ pub struct MockStorage<SerdeOp> {
_serde_op: PhantomData<SerdeOp>,
}
impl<SerdeOp> core::fmt::Debug for MockStorage<SerdeOp> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
format!("MockStorage {{ inner: {:?} }}", self.inner).fmt(f)
}
}
#[async_trait]
impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for MockStorage<SerdeOp> {
type Settings = ();
@ -32,11 +38,11 @@ impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for MockStora
type Transaction = MockStorageTransaction;
type SerdeOperator = SerdeOp;
fn new(_config: Self::Settings) -> Self {
Self {
fn new(_config: Self::Settings) -> Result<Self, Self::Error> {
Ok(Self {
inner: HashMap::new(),
_serde_op: Default::default(),
}
})
}
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> {
@ -45,7 +51,7 @@ impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for MockStora
}
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
Ok(self.inner.get(key).map(|b| b.clone()))
Ok(self.inner.get(key).cloned())
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {

View File

@ -28,18 +28,18 @@ pub trait StorageTransaction: Send + Sync {
/// Main storage functionality trait
#[async_trait]
pub trait StorageBackend {
pub trait StorageBackend: Sized {
/// Backend settings
type Settings: Clone + Send + Sync + 'static;
/// Backend operations error type
type Error: Error + 'static;
type Error: Error + 'static + Send + Sync;
/// Backend transaction type
/// Usually it will be some function that modifies the storage directly or operates
/// over the backend as per the backend specification.
type Transaction: StorageTransaction;
/// Operator to dump/load custom types into the defined backend store type [`Bytes`]
type SerdeOperator: StorageSerde + Send + Sync + 'static;
fn new(config: Self::Settings) -> Self;
fn new(config: Self::Settings) -> Result<Self, Self::Error>;
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error>;
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error>;
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error>;

View File

@ -10,6 +10,14 @@ use sled::transaction::{
// internal
use super::{StorageBackend, StorageSerde, StorageTransaction};
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Transaction(#[from] TransactionError),
#[error(transparent)]
Error(#[from] sled::Error),
}
/// Sled backend setting
#[derive(Clone)]
pub struct SledBackendSettings {
@ -32,25 +40,30 @@ impl StorageTransaction for SledTransaction {
}
/// Sled storage backend
pub struct SledBackend<SerdeOp> {
sled: sled::Db,
_serde_op: PhantomData<SerdeOp>,
}
impl<SerdeOp> core::fmt::Debug for SledBackend<SerdeOp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
format!("SledBackend {{ sled: {:?} }}", self.sled).fmt(f)
}
}
#[async_trait]
impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for SledBackend<SerdeOp> {
type Settings = SledBackendSettings;
type Error = TransactionError;
type Error = Error;
type Transaction = SledTransaction;
type SerdeOperator = SerdeOp;
fn new(config: Self::Settings) -> Self {
Self {
sled: sled::open(config.db_path)
// TODO: We should probably make initialization failable
.unwrap(),
fn new(config: Self::Settings) -> Result<Self, Self::Error> {
Ok(Self {
sled: sled::open(config.db_path)?,
_serde_op: Default::default(),
}
})
}
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> {
@ -81,7 +94,8 @@ mod test {
use tempfile::TempDir;
#[tokio::test]
async fn test_store_load_remove() -> Result<(), TransactionError> {
async fn test_store_load_remove(
) -> Result<(), <SledBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = SledBackendSettings {
db_path: temp_path.path().to_path_buf(),
@ -89,7 +103,7 @@ mod test {
let key = "foo";
let value = "bar";
let mut sled_db: SledBackend<NoStorageSerde> = SledBackend::new(sled_settings);
let mut sled_db: SledBackend<NoStorageSerde> = SledBackend::new(sled_settings)?;
sled_db
.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
@ -102,7 +116,8 @@ mod test {
}
#[tokio::test]
async fn test_transaction() -> Result<(), TransactionError> {
async fn test_transaction() -> Result<(), <SledBackend<NoStorageSerde> as StorageBackend>::Error>
{
let temp_path = TempDir::new().unwrap();
let sled_settings = SledBackendSettings {
@ -111,11 +126,11 @@ mod test {
let key = "foo";
let value = "bar";
let mut sled_db: SledBackend<NoStorageSerde> = SledBackend::new(sled_settings);
let mut sled_db: SledBackend<NoStorageSerde> = SledBackend::new(sled_settings)?;
let result = sled_db
.execute(Box::new(move |tx| {
let key = key.clone();
let value = value.clone();
let key = key;
let value = value;
tx.insert(key, value)?;
let result = tx.get(key)?;
tx.remove(key)?;

View File

@ -233,14 +233,14 @@ impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
#[async_trait]
impl<Backend: StorageBackend + Send + Sync + 'static> ServiceCore for StorageService<Backend> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self {
backend: Backend::new(service_state.settings_reader.get_updated_settings()),
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
Ok(Self {
backend: Backend::new(service_state.settings_reader.get_updated_settings())?,
service_state,
}
})
}
async fn run(mut self) {
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
let Self {
mut backend,
service_state: ServiceStateHandle {
@ -266,6 +266,7 @@ impl<Backend: StorageBackend + Send + Sync + 'static> ServiceCore for StorageSer
println!("{e}");
}
}
Ok(())
}
}