From a343249d928e0f3e4cd34fe8b8516a43d4145b47 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Fri, 18 Nov 2022 09:46:54 +0100 Subject: [PATCH] Storage service (#6) * Added storage main files and trait * Pipe skeleton for StorageService * Implement running StorageService * Add sled module * Refactor error reporting * Missing logging todo * Implement mock storage * Add channel for auto-converting types on channel replies * Simplify StorageBackend trait * Refactor mock backend and use HashMap * Remove sled for now * Refactor serialization scheme * Add storage traits docs * Make transaction a custom trait. This way we can return stuff from the transactions themselves if needed. --- nomos-services/Cargo.toml | 11 +- nomos-services/src/lib.rs | 1 + nomos-services/src/network/backends/mod.rs | 2 +- nomos-services/src/storage/backends/mock.rs | 60 +++++ nomos-services/src/storage/backends/mod.rs | 49 ++++ nomos-services/src/storage/mod.rs | 278 ++++++++++++++++++++ 6 files changed, 398 insertions(+), 3 deletions(-) create mode 100644 nomos-services/src/storage/backends/mock.rs create mode 100644 nomos-services/src/storage/backends/mod.rs create mode 100644 nomos-services/src/storage/mod.rs diff --git a/nomos-services/Cargo.toml b/nomos-services/Cargo.toml index f9e261b2..3c00d4e4 100644 --- a/nomos-services/Cargo.toml +++ b/nomos-services/Cargo.toml @@ -6,7 +6,14 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" } async-trait = "0.1" +bytes = "1.2" +overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +serde = "1.0" tokio = { version = "1", features = ["sync"] } -tracing = "0.1" \ No newline at end of file +thiserror = "1.0" +tracing = "0.1" + +[features] +default = [] +mock = [] \ No newline at end of file diff --git a/nomos-services/src/lib.rs b/nomos-services/src/lib.rs index a61610bd..f00a4874 100644 --- a/nomos-services/src/lib.rs +++ b/nomos-services/src/lib.rs @@ -1 +1,2 @@ pub mod network; +pub mod storage; \ No newline at end of file diff --git a/nomos-services/src/network/backends/mod.rs b/nomos-services/src/network/backends/mod.rs index 3221c6e1..aaaed677 100644 --- a/nomos-services/src/network/backends/mod.rs +++ b/nomos-services/src/network/backends/mod.rs @@ -4,7 +4,7 @@ use tokio::sync::broadcast::Receiver; pub trait NetworkBackend { type Config: Clone + Send + Sync + 'static; - type State: ServiceState + Clone;; + type State: ServiceState + Clone; fn new(config: Self::Config) -> Self; fn broadcast(&self, msg: NetworkData); diff --git a/nomos-services/src/storage/backends/mock.rs b/nomos-services/src/storage/backends/mock.rs new file mode 100644 index 00000000..72af7cf0 --- /dev/null +++ b/nomos-services/src/storage/backends/mock.rs @@ -0,0 +1,60 @@ +use std::collections::HashMap; +use std::marker::PhantomData; +// std +// crates +use crate::storage::backends::{StorageSerde, StorageTransaction}; +use async_trait::async_trait; +use bytes::Bytes; +use thiserror::Error; +// internal +use super::StorageBackend; + +#[derive(Debug, Error)] +#[error("Errors in MockStorage should not happen")] +pub enum MockStorageError {} + +pub type MockStorageTransaction = Box) + Send + Sync>; + +impl StorageTransaction for MockStorageTransaction { + type Result = (); + type Transaction = Self; +} + +// +pub struct MockStorage { + inner: HashMap, + _serde_op: PhantomData, +} + +#[async_trait] +impl StorageBackend for MockStorage { + type Settings = (); + type Error = MockStorageError; + type Transaction = MockStorageTransaction; + type SerdeOperator = SerdeOp; + + fn new(_config: Self::Settings) -> Self { + Self { + inner: HashMap::new(), + _serde_op: Default::default(), + } + } + + async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { + let _ = self.inner.insert(key, value); + Ok(()) + } + + async fn load(&mut self, key: &[u8]) -> Result, Self::Error> { + Ok(self.inner.get(key).map(|b| Bytes::copy_from_slice(b))) + } + + async fn remove(&mut self, key: &[u8]) -> Result, Self::Error> { + Ok(self.inner.remove(key)) + } + + async fn execute(&mut self, transaction: Self::Transaction) -> Result<(), Self::Error> { + transaction(&mut self.inner); + Ok(()) + } +} diff --git a/nomos-services/src/storage/backends/mod.rs b/nomos-services/src/storage/backends/mod.rs new file mode 100644 index 00000000..5bdd782d --- /dev/null +++ b/nomos-services/src/storage/backends/mod.rs @@ -0,0 +1,49 @@ +#[cfg(feature = "mock")] +pub mod mock; + +// std +use std::error::Error; +// crates +use async_trait::async_trait; +use bytes::Bytes; +use serde::{de::DeserializeOwned, Serialize}; +// internal + +/// Trait that defines how to translate from user types to the storage buffer type +pub trait StorageSerde { + type Error: Error; + /// Dump a type as [`Bytes`] + fn serialize(value: T) -> Bytes; + /// Load a type from [`Bytes`] + fn deserialize(buff: Bytes) -> Result; +} + +/// Trait to abstract storage transactions return and operation types +pub trait StorageTransaction: Send + Sync { + type Result: Send + Sync; + type Transaction: Send + Sync; +} + +/// Main storage functionality trait +#[async_trait] +pub trait StorageBackend { + /// Backend settings + type Settings: Clone + Send + Sync + 'static; + /// Backend operations error type + type Error: Error + 'static; + /// Backend transaction type + /// Usually it will be some function that modifies the storage directly or operates + /// over the backend as per the backend specification. + type Transaction: StorageTransaction; + /// Operator to dump/load custom types into the defined backend store type [`Bytes`] + type SerdeOperator: StorageSerde + Send + Sync + 'static; + fn new(config: Self::Settings) -> Self; + async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error>; + async fn load(&mut self, key: &[u8]) -> Result, Self::Error>; + async fn remove(&mut self, key: &[u8]) -> Result, Self::Error>; + /// Execute a transaction in the current backend + async fn execute( + &mut self, + transaction: Self::Transaction, + ) -> Result<::Result, Self::Error>; +} diff --git a/nomos-services/src/storage/mod.rs b/nomos-services/src/storage/mod.rs new file mode 100644 index 00000000..646feb47 --- /dev/null +++ b/nomos-services/src/storage/mod.rs @@ -0,0 +1,278 @@ +pub mod backends; + +// std +use std::fmt::{Debug, Formatter}; +use std::marker::PhantomData; +// crates +use async_trait::async_trait; +use bytes::Bytes; +use overwatch::services::handle::ServiceStateHandle; +use serde::de::DeserializeOwned; +// internal +use crate::storage::backends::{StorageSerde, StorageTransaction}; +use backends::StorageBackend; +use overwatch::services::relay::RelayMessage; +use overwatch::services::state::{NoOperator, NoState}; +use overwatch::services::{ServiceCore, ServiceData, ServiceId}; +use serde::Serialize; + +/// Storage message that maps to [`StorageBackend`] trait +pub enum StorageMsg { + Load { + key: Bytes, + reply_channel: tokio::sync::oneshot::Sender>, + }, + Store { + key: Bytes, + value: Bytes, + }, + Remove { + key: Bytes, + reply_channel: tokio::sync::oneshot::Sender>, + }, + Execute { + transaction: Backend::Transaction, + reply_channel: + tokio::sync::oneshot::Sender<::Result>, + }, +} + +/// Reply channel for storage messages +pub struct StorageReplyReceiver { + channel: tokio::sync::oneshot::Receiver, + _backend: PhantomData, +} + +impl StorageReplyReceiver { + pub fn new(channel: tokio::sync::oneshot::Receiver) -> Self { + Self { + channel, + _backend: Default::default(), + } + } + + pub fn into_inner(self) -> tokio::sync::oneshot::Receiver { + self.channel + } +} + +impl StorageReplyReceiver { + /// Receive and transform the reply into the desired type + /// Target type must implement `From` from the original backend stored type. + pub async fn recv(self) -> Result + where + Output: DeserializeOwned, + { + self.channel + .await + // TODO: This should probably just return a result anyway. But for now we can consider in infallible. + .map(|b| { + Backend::SerdeOperator::deserialize(b) + .expect("Recovery from storage should never fail") + }) + } +} + +impl StorageMsg { + pub fn new_load_message( + key: K, + ) -> ( + StorageMsg, + StorageReplyReceiver, Backend>, + ) { + let key = Backend::SerdeOperator::serialize(key); + let (reply_channel, receiver) = tokio::sync::oneshot::channel(); + ( + Self::Load { key, reply_channel }, + StorageReplyReceiver::new(receiver), + ) + } + + pub fn new_store_message(key: K, value: V) -> StorageMsg { + let key = Backend::SerdeOperator::serialize(key); + let value = Backend::SerdeOperator::serialize(value); + StorageMsg::Store { key, value } + } + + pub fn new_remove_message( + key: K, + ) -> ( + StorageMsg, + StorageReplyReceiver, Backend>, + ) { + let key = Backend::SerdeOperator::serialize(key); + let (reply_channel, receiver) = tokio::sync::oneshot::channel(); + ( + Self::Remove { key, reply_channel }, + StorageReplyReceiver::new(receiver), + ) + } + + pub fn new_transaction_message( + transaction: Backend::Transaction, + ) -> ( + StorageMsg, + StorageReplyReceiver<::Result, Backend>, + ) { + let (reply_channel, receiver) = tokio::sync::oneshot::channel(); + ( + Self::Execute { + transaction, + reply_channel, + }, + StorageReplyReceiver::new(receiver), + ) + } +} + +// Implement `Debug` manually to avoid constraining `Backend` to `Debug` +impl Debug for StorageMsg { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + StorageMsg::Load { key, .. } => { + write!(f, "Load {{ {key:?} }}") + } + StorageMsg::Store { key, value } => { + write!(f, "Store {{ {key:?}, {value:?}}}") + } + StorageMsg::Remove { key, .. } => { + write!(f, "Remove {{ {key:?} }}") + } + StorageMsg::Execute { .. } => write!(f, "Execute transaction"), + } + } +} + +impl RelayMessage for StorageMsg {} + +/// Storage error +/// Errors that may happen when performing storage operations +#[derive(Debug, thiserror::Error)] +enum StorageServiceError { + #[error("Couldn't send a reply for operation `{operation}` with key [{key:?}]")] + ReplyError { operation: String, key: Bytes }, + #[error("Storage backend error")] + BackendError(#[source] Backend::Error), +} + +/// Storage service that wraps a [`StorageBackend`] +pub struct StorageService { + backend: Backend, + service_state: ServiceStateHandle, +} + +impl StorageService { + /// Handle load message + async fn handle_load( + backend: &mut Backend, + key: Bytes, + reply_channel: tokio::sync::oneshot::Sender>, + ) -> Result<(), StorageServiceError> { + let result: Option = backend + .load(&key) + .await + .map_err(StorageServiceError::BackendError)?; + reply_channel + .send(result) + .map_err(|_| StorageServiceError::ReplyError { + operation: "Load".to_string(), + key, + }) + } + + /// Handle remove message + async fn handle_remove( + backend: &mut Backend, + key: Bytes, + reply_channel: tokio::sync::oneshot::Sender>, + ) -> Result<(), StorageServiceError> { + let result: Option = backend + .remove(&key) + .await + .map_err(StorageServiceError::BackendError)?; + reply_channel + .send(result) + .map_err(|_| StorageServiceError::ReplyError { + operation: "Remove".to_string(), + key, + }) + } + + /// Handle store message + async fn handle_store( + backend: &mut Backend, + key: Bytes, + value: Bytes, + ) -> Result<(), StorageServiceError> { + backend + .store(key, value) + .await + .map_err(StorageServiceError::BackendError) + } + + /// Handle execute message + async fn handle_execute( + backend: &mut Backend, + transaction: Backend::Transaction, + reply_channel: tokio::sync::oneshot::Sender< + ::Result, + >, + ) -> Result<(), StorageServiceError> { + let result = backend + .execute(transaction) + .await + .map_err(StorageServiceError::BackendError)?; + reply_channel + .send(result) + .map_err(|_| StorageServiceError::ReplyError { + operation: "Execute".to_string(), + key: Bytes::new(), + }) + } +} + +#[async_trait] +impl ServiceCore for StorageService { + fn init(mut service_state: ServiceStateHandle) -> Self { + Self { + backend: Backend::new(service_state.settings_reader.get_updated_settings()), + service_state, + } + } + + async fn run(mut self) { + let Self { + mut backend, + service_state: ServiceStateHandle { + mut inbound_relay, .. + }, + } = self; + let backend = &mut backend; + while let Some(msg) = inbound_relay.recv().await { + if let Err(e) = match msg { + StorageMsg::Load { key, reply_channel } => { + Self::handle_load(backend, key, reply_channel).await + } + StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await, + StorageMsg::Remove { key, reply_channel } => { + Self::handle_remove(backend, key, reply_channel).await + } + StorageMsg::Execute { + transaction, + reply_channel, + } => Self::handle_execute(backend, transaction, reply_channel).await, + } { + // TODO: add proper logging + println!("{e}"); + } + } + } +} + +impl ServiceData for StorageService { + const SERVICE_ID: ServiceId = "Storage"; + type Settings = Backend::Settings; + type State = NoState; + type StateOperator = NoOperator; + type Message = StorageMsg; +}