diff --git a/nomos-services/Cargo.toml b/nomos-services/Cargo.toml index b1976139..c3f0d82e 100644 --- a/nomos-services/Cargo.toml +++ b/nomos-services/Cargo.toml @@ -9,14 +9,19 @@ edition = "2021" async-trait = "0.1" bytes = "1.2" overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +multiaddr = "0.15" serde = "1.0" +sled = { version = "0.34", optional = true } tokio = { version = "1", features = ["sync"] } thiserror = "1.0" tracing = "0.1" waku = { git = "https://github.com/waku-org/waku-rust-bindings" } -multiaddr = "0.15" +[dev-dependencies] +tempfile = "3.3" +tokio = { version = "1", features = ["sync", "macros", "time"] } [features] default = [] mock = [] +sled-backend = ["sled"] \ No newline at end of file diff --git a/nomos-services/src/storage/backends/mod.rs b/nomos-services/src/storage/backends/mod.rs index c78e9f29..bfa09a00 100644 --- a/nomos-services/src/storage/backends/mod.rs +++ b/nomos-services/src/storage/backends/mod.rs @@ -1,5 +1,7 @@ #[cfg(feature = "mock")] pub mod mock; +#[cfg(feature = "sled")] +pub mod sled; // std use std::error::Error; diff --git a/nomos-services/src/storage/backends/sled.rs b/nomos-services/src/storage/backends/sled.rs new file mode 100644 index 00000000..7d9f4ef0 --- /dev/null +++ b/nomos-services/src/storage/backends/sled.rs @@ -0,0 +1,130 @@ +// std +use std::marker::PhantomData; +use std::path::PathBuf; +// crates +use async_trait::async_trait; +use bytes::Bytes; +use sled::transaction::{ + ConflictableTransactionResult, TransactionError, TransactionResult, TransactionalTree, +}; +// internal +use super::StorageBackend; +use crate::storage::backends::{StorageSerde, StorageTransaction}; + +/// Sled backend setting +#[derive(Clone)] +pub struct SledBackendSettings { + /// File path to the db file + db_path: PathBuf, +} + +/// Sled transaction type +/// Function that takes a reference to the transactional tree. No `&mut` needed as sled operations +/// work over simple `&`. +pub type SledTransaction = Box< + dyn Fn(&TransactionalTree) -> ConflictableTransactionResult, sled::Error> + + Send + + Sync, +>; + +impl StorageTransaction for SledTransaction { + type Result = TransactionResult, sled::Error>; + type Transaction = Self; +} + +/// Sled storage backend +pub struct SledBackend { + sled: sled::Db, + _serde_op: PhantomData, +} + +#[async_trait] +impl StorageBackend for SledBackend { + type Settings = SledBackendSettings; + type Error = TransactionError; + type Transaction = SledTransaction; + type SerdeOperator = SerdeOp; + + fn new(config: Self::Settings) -> Self { + Self { + sled: sled::open(config.db_path) + // TODO: We should probably make initialization failable + .unwrap(), + _serde_op: Default::default(), + } + } + + async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { + let _ = self.sled.insert(key, value.to_vec())?; + Ok(()) + } + + async fn load(&mut self, key: &[u8]) -> Result, Self::Error> { + Ok(self.sled.get(key)?.map(|ivec| ivec.to_vec().into())) + } + + async fn remove(&mut self, key: &[u8]) -> Result, Self::Error> { + Ok(self.sled.remove(key)?.map(|ivec| ivec.to_vec().into())) + } + + async fn execute( + &mut self, + transaction: Self::Transaction, + ) -> Result<::Result, Self::Error> { + Ok(self.sled.transaction(transaction)) + } +} + +#[cfg(test)] +mod test { + use super::super::testing::NoStorageSerde; + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_store_load_remove() -> Result<(), TransactionError> { + let temp_path = TempDir::new().unwrap(); + let sled_settings = SledBackendSettings { + db_path: temp_path.path().to_path_buf(), + }; + let key = "foo"; + let value = "bar"; + + let mut sled_db: SledBackend = SledBackend::new(sled_settings); + sled_db + .store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + let load_value = sled_db.load(key.as_bytes()).await?; + assert_eq!(load_value, Some(value.as_bytes().into())); + let removed_value = sled_db.remove(key.as_bytes()).await?; + assert_eq!(removed_value, Some(value.as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_transaction() -> Result<(), TransactionError> { + let temp_path = TempDir::new().unwrap(); + + let sled_settings = SledBackendSettings { + db_path: temp_path.path().to_path_buf(), + }; + let key = "foo"; + let value = "bar"; + + let mut sled_db: SledBackend = SledBackend::new(sled_settings); + let result = sled_db + .execute(Box::new(move |tx| { + let key = key.clone(); + let value = value.clone(); + tx.insert(key, value)?; + let result = tx.get(key)?; + tx.remove(key)?; + Ok(result.map(|ivec| ivec.to_vec().into())) + })) + .await??; + assert_eq!(result, Some(value.as_bytes().into())); + + Ok(()) + } +}