Storage service (#6)

* Added storage main files and trait

* Pipe skeleton for StorageService

* Implement running StorageService

* Add sled module

* Refactor error reporting

* Missing logging todo

* Implement mock storage

* Add channel for auto-converting types on channel replies

* Simplify StorageBackend trait

* Refactor mock backend and use HashMap

* Remove sled for now

* Refactor serialization scheme

* Add storage traits docs

* Make transaction a custom trait. This way we can return stuff from the transactions themselves if needed.
This commit is contained in:
Daniel Sanchez 2022-11-18 09:46:54 +01:00 committed by GitHub
parent 0fcfd92a55
commit a343249d92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 398 additions and 3 deletions

View File

@ -6,7 +6,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
async-trait = "0.1"
bytes = "1.2"
overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
serde = "1.0"
tokio = { version = "1", features = ["sync"] }
tracing = "0.1"
thiserror = "1.0"
tracing = "0.1"
[features]
default = []
mock = []

View File

@ -1 +1,2 @@
pub mod network;
pub mod storage;

View File

@ -4,7 +4,7 @@ use tokio::sync::broadcast::Receiver;
pub trait NetworkBackend {
type Config: Clone + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Config> + Clone;;
type State: ServiceState<Settings = Self::Config> + Clone;
fn new(config: Self::Config) -> Self;
fn broadcast(&self, msg: NetworkData);

View File

@ -0,0 +1,60 @@
use std::collections::HashMap;
use std::marker::PhantomData;
// std
// crates
use crate::storage::backends::{StorageSerde, StorageTransaction};
use async_trait::async_trait;
use bytes::Bytes;
use thiserror::Error;
// internal
use super::StorageBackend;
#[derive(Debug, Error)]
#[error("Errors in MockStorage should not happen")]
pub enum MockStorageError {}
pub type MockStorageTransaction = Box<dyn Fn(&mut HashMap<Bytes, Bytes>) + Send + Sync>;
impl StorageTransaction for MockStorageTransaction {
type Result = ();
type Transaction = Self;
}
//
pub struct MockStorage<SerdeOp> {
inner: HashMap<Bytes, Bytes>,
_serde_op: PhantomData<SerdeOp>,
}
#[async_trait]
impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for MockStorage<SerdeOp> {
type Settings = ();
type Error = MockStorageError;
type Transaction = MockStorageTransaction;
type SerdeOperator = SerdeOp;
fn new(_config: Self::Settings) -> Self {
Self {
inner: HashMap::new(),
_serde_op: Default::default(),
}
}
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> {
let _ = self.inner.insert(key, value);
Ok(())
}
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
Ok(self.inner.get(key).map(|b| Bytes::copy_from_slice(b)))
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
Ok(self.inner.remove(key))
}
async fn execute(&mut self, transaction: Self::Transaction) -> Result<(), Self::Error> {
transaction(&mut self.inner);
Ok(())
}
}

View File

@ -0,0 +1,49 @@
#[cfg(feature = "mock")]
pub mod mock;
// std
use std::error::Error;
// crates
use async_trait::async_trait;
use bytes::Bytes;
use serde::{de::DeserializeOwned, Serialize};
// internal
/// Trait that defines how to translate from user types to the storage buffer type
pub trait StorageSerde {
type Error: Error;
/// Dump a type as [`Bytes`]
fn serialize<T: Serialize>(value: T) -> Bytes;
/// Load a type from [`Bytes`]
fn deserialize<T: DeserializeOwned>(buff: Bytes) -> Result<T, Self::Error>;
}
/// Trait to abstract storage transactions return and operation types
pub trait StorageTransaction: Send + Sync {
type Result: Send + Sync;
type Transaction: Send + Sync;
}
/// Main storage functionality trait
#[async_trait]
pub trait StorageBackend {
/// Backend settings
type Settings: Clone + Send + Sync + 'static;
/// Backend operations error type
type Error: Error + 'static;
/// Backend transaction type
/// Usually it will be some function that modifies the storage directly or operates
/// over the backend as per the backend specification.
type Transaction: StorageTransaction;
/// Operator to dump/load custom types into the defined backend store type [`Bytes`]
type SerdeOperator: StorageSerde + Send + Sync + 'static;
fn new(config: Self::Settings) -> Self;
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error>;
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error>;
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error>;
/// Execute a transaction in the current backend
async fn execute(
&mut self,
transaction: Self::Transaction,
) -> Result<<Self::Transaction as StorageTransaction>::Result, Self::Error>;
}

View File

@ -0,0 +1,278 @@
pub mod backends;
// std
use std::fmt::{Debug, Formatter};
use std::marker::PhantomData;
// crates
use async_trait::async_trait;
use bytes::Bytes;
use overwatch::services::handle::ServiceStateHandle;
use serde::de::DeserializeOwned;
// internal
use crate::storage::backends::{StorageSerde, StorageTransaction};
use backends::StorageBackend;
use overwatch::services::relay::RelayMessage;
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use serde::Serialize;
/// Storage message that maps to [`StorageBackend`] trait
pub enum StorageMsg<Backend: StorageBackend> {
Load {
key: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Option<Bytes>>,
},
Store {
key: Bytes,
value: Bytes,
},
Remove {
key: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Option<Bytes>>,
},
Execute {
transaction: Backend::Transaction,
reply_channel:
tokio::sync::oneshot::Sender<<Backend::Transaction as StorageTransaction>::Result>,
},
}
/// Reply channel for storage messages
pub struct StorageReplyReceiver<T, Backend> {
channel: tokio::sync::oneshot::Receiver<T>,
_backend: PhantomData<Backend>,
}
impl<T, Backend> StorageReplyReceiver<T, Backend> {
pub fn new(channel: tokio::sync::oneshot::Receiver<T>) -> Self {
Self {
channel,
_backend: Default::default(),
}
}
pub fn into_inner(self) -> tokio::sync::oneshot::Receiver<T> {
self.channel
}
}
impl<Backend: StorageBackend> StorageReplyReceiver<Bytes, Backend> {
/// Receive and transform the reply into the desired type
/// Target type must implement `From` from the original backend stored type.
pub async fn recv<Output>(self) -> Result<Output, tokio::sync::oneshot::error::RecvError>
where
Output: DeserializeOwned,
{
self.channel
.await
// TODO: This should probably just return a result anyway. But for now we can consider in infallible.
.map(|b| {
Backend::SerdeOperator::deserialize(b)
.expect("Recovery from storage should never fail")
})
}
}
impl<Backend: StorageBackend> StorageMsg<Backend> {
pub fn new_load_message<K: Serialize>(
key: K,
) -> (
StorageMsg<Backend>,
StorageReplyReceiver<Option<Bytes>, Backend>,
) {
let key = Backend::SerdeOperator::serialize(key);
let (reply_channel, receiver) = tokio::sync::oneshot::channel();
(
Self::Load { key, reply_channel },
StorageReplyReceiver::new(receiver),
)
}
pub fn new_store_message<K: Serialize, V: Serialize>(key: K, value: V) -> StorageMsg<Backend> {
let key = Backend::SerdeOperator::serialize(key);
let value = Backend::SerdeOperator::serialize(value);
StorageMsg::Store { key, value }
}
pub fn new_remove_message<K: Serialize>(
key: K,
) -> (
StorageMsg<Backend>,
StorageReplyReceiver<Option<Bytes>, Backend>,
) {
let key = Backend::SerdeOperator::serialize(key);
let (reply_channel, receiver) = tokio::sync::oneshot::channel();
(
Self::Remove { key, reply_channel },
StorageReplyReceiver::new(receiver),
)
}
pub fn new_transaction_message(
transaction: Backend::Transaction,
) -> (
StorageMsg<Backend>,
StorageReplyReceiver<<Backend::Transaction as StorageTransaction>::Result, Backend>,
) {
let (reply_channel, receiver) = tokio::sync::oneshot::channel();
(
Self::Execute {
transaction,
reply_channel,
},
StorageReplyReceiver::new(receiver),
)
}
}
// Implement `Debug` manually to avoid constraining `Backend` to `Debug`
impl<Backend: StorageBackend> Debug for StorageMsg<Backend> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
StorageMsg::Load { key, .. } => {
write!(f, "Load {{ {key:?} }}")
}
StorageMsg::Store { key, value } => {
write!(f, "Store {{ {key:?}, {value:?}}}")
}
StorageMsg::Remove { key, .. } => {
write!(f, "Remove {{ {key:?} }}")
}
StorageMsg::Execute { .. } => write!(f, "Execute transaction"),
}
}
}
impl<Backend: StorageBackend + 'static> RelayMessage for StorageMsg<Backend> {}
/// Storage error
/// Errors that may happen when performing storage operations
#[derive(Debug, thiserror::Error)]
enum StorageServiceError<Backend: StorageBackend> {
#[error("Couldn't send a reply for operation `{operation}` with key [{key:?}]")]
ReplyError { operation: String, key: Bytes },
#[error("Storage backend error")]
BackendError(#[source] Backend::Error),
}
/// Storage service that wraps a [`StorageBackend`]
pub struct StorageService<Backend: StorageBackend + Send + Sync + 'static> {
backend: Backend,
service_state: ServiceStateHandle<Self>,
}
impl<Backend: StorageBackend + Send + Sync + 'static> StorageService<Backend> {
/// Handle load message
async fn handle_load(
backend: &mut Backend,
key: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Option<Bytes>>,
) -> Result<(), StorageServiceError<Backend>> {
let result: Option<Bytes> = backend
.load(&key)
.await
.map_err(StorageServiceError::BackendError)?;
reply_channel
.send(result)
.map_err(|_| StorageServiceError::ReplyError {
operation: "Load".to_string(),
key,
})
}
/// Handle remove message
async fn handle_remove(
backend: &mut Backend,
key: Bytes,
reply_channel: tokio::sync::oneshot::Sender<Option<Bytes>>,
) -> Result<(), StorageServiceError<Backend>> {
let result: Option<Bytes> = backend
.remove(&key)
.await
.map_err(StorageServiceError::BackendError)?;
reply_channel
.send(result)
.map_err(|_| StorageServiceError::ReplyError {
operation: "Remove".to_string(),
key,
})
}
/// Handle store message
async fn handle_store(
backend: &mut Backend,
key: Bytes,
value: Bytes,
) -> Result<(), StorageServiceError<Backend>> {
backend
.store(key, value)
.await
.map_err(StorageServiceError::BackendError)
}
/// Handle execute message
async fn handle_execute(
backend: &mut Backend,
transaction: Backend::Transaction,
reply_channel: tokio::sync::oneshot::Sender<
<Backend::Transaction as StorageTransaction>::Result,
>,
) -> Result<(), StorageServiceError<Backend>> {
let result = backend
.execute(transaction)
.await
.map_err(StorageServiceError::BackendError)?;
reply_channel
.send(result)
.map_err(|_| StorageServiceError::ReplyError {
operation: "Execute".to_string(),
key: Bytes::new(),
})
}
}
#[async_trait]
impl<Backend: StorageBackend + Send + Sync + 'static> ServiceCore for StorageService<Backend> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self {
backend: Backend::new(service_state.settings_reader.get_updated_settings()),
service_state,
}
}
async fn run(mut self) {
let Self {
mut backend,
service_state: ServiceStateHandle {
mut inbound_relay, ..
},
} = self;
let backend = &mut backend;
while let Some(msg) = inbound_relay.recv().await {
if let Err(e) = match msg {
StorageMsg::Load { key, reply_channel } => {
Self::handle_load(backend, key, reply_channel).await
}
StorageMsg::Store { key, value } => Self::handle_store(backend, key, value).await,
StorageMsg::Remove { key, reply_channel } => {
Self::handle_remove(backend, key, reply_channel).await
}
StorageMsg::Execute {
transaction,
reply_channel,
} => Self::handle_execute(backend, transaction, reply_channel).await,
} {
// TODO: add proper logging
println!("{e}");
}
}
}
}
impl<Backend: StorageBackend + Send + Sync> ServiceData for StorageService<Backend> {
const SERVICE_ID: ServiceId = "Storage";
type Settings = Backend::Settings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = StorageMsg<Backend>;
}