diff --git a/nomos-services/storage/Cargo.toml b/nomos-services/storage/Cargo.toml index b2a92743..a7d0468b 100644 --- a/nomos-services/storage/Cargo.toml +++ b/nomos-services/storage/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.2" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = "1.0" sled = { version = "0.34", optional = true } +rocksdb = { version = "0.22", optional = true } thiserror = "1.0" tracing = "0.1" @@ -24,3 +25,9 @@ tempfile = "3" default = [] mock = [] sled-backend = ["sled"] +rocksdb-backend = ["rocksdb"] + +[[bin]] +name = "rocks" +path = "src/bin/rocks.rs" +required-features = ["rocksdb-backend"] diff --git a/nomos-services/storage/src/backends/mod.rs b/nomos-services/storage/src/backends/mod.rs index ac640857..2594b9ca 100644 --- a/nomos-services/storage/src/backends/mod.rs +++ b/nomos-services/storage/src/backends/mod.rs @@ -3,6 +3,9 @@ pub mod mock; #[cfg(feature = "sled")] pub mod sled; +#[cfg(feature = "rocksdb")] +pub mod rocksdb; + // std use std::error::Error; // crates diff --git a/nomos-services/storage/src/backends/rocksdb.rs b/nomos-services/storage/src/backends/rocksdb.rs new file mode 100644 index 00000000..e97aa96d --- /dev/null +++ b/nomos-services/storage/src/backends/rocksdb.rs @@ -0,0 +1,252 @@ +// std +use std::path::PathBuf; +use std::{marker::PhantomData, sync::Arc}; +// crates +use async_trait::async_trait; +use bytes::Bytes; +pub use rocksdb::Error; +use rocksdb::{Options, DB}; +// internal +use super::{StorageBackend, StorageSerde, StorageTransaction}; + +/// Rocks backend setting +#[derive(Clone, Debug)] +pub struct RocksBackendSettings { + /// File path to the db file + pub db_path: PathBuf, + pub read_only: bool, + pub column_family: Option, +} + +/// Rocks transaction type +// Do not use `TransactionDB` here, because rocksdb's `TransactionDB` does not support open by read-only mode. +// Thus, we cannot open the same db in two or more processes. +pub struct Transaction { + rocks: Arc, + #[allow(clippy::type_complexity)] + executor: Box Result, Error> + Send + Sync>, +} + +impl Transaction { + /// Execute a function over the transaction + pub fn execute(self) -> Result, Error> { + (self.executor)(&self.rocks) + } +} + +impl StorageTransaction for Transaction { + type Result = Result, Error>; + type Transaction = Self; +} + +/// Rocks storage backend + +pub struct RocksBackend { + rocks: Arc, + _serde_op: PhantomData, +} + +impl RocksBackend { + pub fn txn( + &self, + executor: impl FnOnce(&DB) -> Result, Error> + Send + Sync + 'static, + ) -> Transaction { + Transaction { + rocks: self.rocks.clone(), + executor: Box::new(executor), + } + } +} + +impl core::fmt::Debug for RocksBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + format!("RocksBackend {{ rocks: {:?} }}", self.rocks).fmt(f) + } +} + +#[async_trait] +impl StorageBackend for RocksBackend { + type Settings = RocksBackendSettings; + type Error = rocksdb::Error; + type Transaction = Transaction; + type SerdeOperator = SerdeOp; + + fn new(config: Self::Settings) -> Result { + let RocksBackendSettings { + db_path, + read_only, + column_family: cf, + } = config; + + let db = match (read_only, cf) { + (true, None) => { + let mut opts = Options::default(); + opts.create_if_missing(false); + DB::open_for_read_only(&opts, db_path, false)? + } + (true, Some(cf)) => { + let mut opts = Options::default(); + opts.create_if_missing(false); + DB::open_cf_for_read_only(&opts, db_path, [cf], false)? + } + (false, None) => { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + DB::open(&opts, db_path)? + } + (false, Some(cf)) => { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + DB::open_cf(&opts, db_path, [cf])? + } + }; + + Ok(Self { + rocks: Arc::new(db), + _serde_op: Default::default(), + }) + } + + async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { + self.rocks.put(key, value) + } + + async fn load(&mut self, key: &[u8]) -> Result, Self::Error> { + self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into())) + } + + async fn remove(&mut self, key: &[u8]) -> Result, Self::Error> { + self.load(key).await.and_then(|val| { + if val.is_some() { + self.rocks.delete(key).map(|_| val) + } else { + Ok(None) + } + }) + } + + async fn execute( + &mut self, + transaction: Self::Transaction, + ) -> Result<::Result, Self::Error> { + Ok(transaction.execute()) + } +} + +#[cfg(test)] +mod test { + use super::super::testing::NoStorageSerde; + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_store_load_remove( + ) -> Result<(), as StorageBackend>::Error> { + let temp_path = TempDir::new().unwrap(); + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + let key = "foo"; + let value = "bar"; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + db.store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + let load_value = db.load(key.as_bytes()).await?; + assert_eq!(load_value, Some(value.as_bytes().into())); + let removed_value = db.remove(key.as_bytes()).await?; + assert_eq!(removed_value, Some(value.as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_transaction( + ) -> Result<(), as StorageBackend>::Error> { + let temp_path = TempDir::new().unwrap(); + + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + let txn = db.txn(|db| { + let key = "foo"; + let value = "bar"; + db.put(key, value)?; + let result = db.get(key)?; + db.delete(key)?; + Ok(result.map(|ivec| ivec.to_vec().into())) + }); + let result = db.execute(txn).await??; + assert_eq!(result, Some("bar".as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_multi_readers_single_writer( + ) -> Result<(), as StorageBackend>::Error> { + use tokio::sync::mpsc::channel; + + let temp_path = TempDir::new().unwrap(); + let path = temp_path.path().to_path_buf(); + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + let key = "foo"; + let value = "bar"; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + + let (tx, mut rx) = channel(5); + // now let us spawn a few readers + for _ in 0..5 { + let p = path.clone(); + let tx = tx.clone(); + std::thread::spawn(move || { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let sled_settings = RocksBackendSettings { + db_path: p, + read_only: true, + column_family: None, + }; + let key = "foo"; + + let mut db: RocksBackend = + RocksBackend::new(sled_settings).unwrap(); + + while db.load(key.as_bytes()).await.unwrap().is_none() { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + tx.send(()).await.unwrap(); + }); + }); + } + + db.store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + + let mut recvs = 0; + loop { + if rx.recv().await.is_some() { + recvs += 1; + if recvs == 5 { + break; + } + } + } + Ok(()) + } +} diff --git a/nomos-services/storage/src/bin/rocks.rs b/nomos-services/storage/src/bin/rocks.rs new file mode 100644 index 00000000..bfd567e8 --- /dev/null +++ b/nomos-services/storage/src/bin/rocks.rs @@ -0,0 +1,58 @@ +use rocksdb::{Options, DB}; + +const TEMP_ROCKS_PATH: &str = "rocks"; + +pub fn rocksdb_ro() { + let mut opts = Options::default(); + opts.create_if_missing(true); + + // open in read only mode + let db = DB::open_cf_for_read_only(&opts, TEMP_ROCKS_PATH, ["blocks", "da"], false).unwrap(); + + let blocks_cf = db.cf_handle("blocks").unwrap(); + let r = db.get_cf(blocks_cf, b"block1").unwrap().unwrap(); + + assert_eq!(r, b"block1data"); + + let da_cf = db.cf_handle("da").unwrap(); + let r = db.get_cf(da_cf, b"da1").unwrap().unwrap(); + assert_eq!(r, b"da1data"); + + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +pub fn rocksdb_rw() { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let db = DB::open_cf(&opts, TEMP_ROCKS_PATH, ["blocks", "da"]).unwrap(); + + // open blocks column family and insert a block + let blocks_cf = db.cf_handle("blocks").unwrap(); + db.put_cf(blocks_cf, b"block1", b"block1data").unwrap(); + + // open da column family and insert a blob + let da_cf = db.cf_handle("da").unwrap(); + db.put_cf(da_cf, b"da1", b"da1data").unwrap(); + + // A loop to mock a long running program + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +fn main() { + let mut args = std::env::args(); + args.next(); + let o = args.next(); + if o.is_none() { + println!("open in read-write mode"); + rocksdb_rw() + } else { + println!("open in read-only mode"); + rocksdb_ro() + } +}