1
0
mirror of synced 2025-01-11 00:05:48 +00:00

RocksDB Storage service (#584)

* finish rocksdb storage service
This commit is contained in:
Al Liu 2024-02-29 12:32:45 +08:00 committed by GitHub
parent bafc6dfb1f
commit fde0d29860
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 320 additions and 0 deletions

View File

@ -13,6 +13,7 @@ bytes = "1.2"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
serde = "1.0"
sled = { version = "0.34", optional = true }
rocksdb = { version = "0.22", optional = true }
thiserror = "1.0"
tracing = "0.1"
@ -24,3 +25,9 @@ tempfile = "3"
default = []
mock = []
sled-backend = ["sled"]
rocksdb-backend = ["rocksdb"]
[[bin]]
name = "rocks"
path = "src/bin/rocks.rs"
required-features = ["rocksdb-backend"]

View File

@ -3,6 +3,9 @@ pub mod mock;
#[cfg(feature = "sled")]
pub mod sled;
#[cfg(feature = "rocksdb")]
pub mod rocksdb;
// std
use std::error::Error;
// crates

View File

@ -0,0 +1,252 @@
// std
use std::path::PathBuf;
use std::{marker::PhantomData, sync::Arc};
// crates
use async_trait::async_trait;
use bytes::Bytes;
pub use rocksdb::Error;
use rocksdb::{Options, DB};
// internal
use super::{StorageBackend, StorageSerde, StorageTransaction};
/// Rocks backend setting
#[derive(Clone, Debug)]
pub struct RocksBackendSettings {
/// File path to the db file
pub db_path: PathBuf,
pub read_only: bool,
pub column_family: Option<String>,
}
/// Rocks transaction type
// Do not use `TransactionDB` here, because rocksdb's `TransactionDB` does not support open by read-only mode.
// Thus, we cannot open the same db in two or more processes.
pub struct Transaction {
rocks: Arc<DB>,
#[allow(clippy::type_complexity)]
executor: Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync>,
}
impl Transaction {
/// Execute a function over the transaction
pub fn execute(self) -> Result<Option<Bytes>, Error> {
(self.executor)(&self.rocks)
}
}
impl StorageTransaction for Transaction {
type Result = Result<Option<Bytes>, Error>;
type Transaction = Self;
}
/// Rocks storage backend
pub struct RocksBackend<SerdeOp> {
rocks: Arc<DB>,
_serde_op: PhantomData<SerdeOp>,
}
impl<SerdeOp> RocksBackend<SerdeOp> {
pub fn txn(
&self,
executor: impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static,
) -> Transaction {
Transaction {
rocks: self.rocks.clone(),
executor: Box::new(executor),
}
}
}
impl<SerdeOp> core::fmt::Debug for RocksBackend<SerdeOp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
format!("RocksBackend {{ rocks: {:?} }}", self.rocks).fmt(f)
}
}
#[async_trait]
impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for RocksBackend<SerdeOp> {
type Settings = RocksBackendSettings;
type Error = rocksdb::Error;
type Transaction = Transaction;
type SerdeOperator = SerdeOp;
fn new(config: Self::Settings) -> Result<Self, Self::Error> {
let RocksBackendSettings {
db_path,
read_only,
column_family: cf,
} = config;
let db = match (read_only, cf) {
(true, None) => {
let mut opts = Options::default();
opts.create_if_missing(false);
DB::open_for_read_only(&opts, db_path, false)?
}
(true, Some(cf)) => {
let mut opts = Options::default();
opts.create_if_missing(false);
DB::open_cf_for_read_only(&opts, db_path, [cf], false)?
}
(false, None) => {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
DB::open(&opts, db_path)?
}
(false, Some(cf)) => {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
DB::open_cf(&opts, db_path, [cf])?
}
};
Ok(Self {
rocks: Arc::new(db),
_serde_op: Default::default(),
})
}
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> {
self.rocks.put(key, value)
}
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into()))
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.load(key).await.and_then(|val| {
if val.is_some() {
self.rocks.delete(key).map(|_| val)
} else {
Ok(None)
}
})
}
async fn execute(
&mut self,
transaction: Self::Transaction,
) -> Result<<Self::Transaction as StorageTransaction>::Result, Self::Error> {
Ok(transaction.execute())
}
}
#[cfg(test)]
mod test {
use super::super::testing::NoStorageSerde;
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_store_load_remove(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let key = "foo";
let value = "bar";
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
db.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
let load_value = db.load(key.as_bytes()).await?;
assert_eq!(load_value, Some(value.as_bytes().into()));
let removed_value = db.remove(key.as_bytes()).await?;
assert_eq!(removed_value, Some(value.as_bytes().into()));
Ok(())
}
#[tokio::test]
async fn test_transaction(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
let txn = db.txn(|db| {
let key = "foo";
let value = "bar";
db.put(key, value)?;
let result = db.get(key)?;
db.delete(key)?;
Ok(result.map(|ivec| ivec.to_vec().into()))
});
let result = db.execute(txn).await??;
assert_eq!(result, Some("bar".as_bytes().into()));
Ok(())
}
#[tokio::test]
async fn test_multi_readers_single_writer(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
use tokio::sync::mpsc::channel;
let temp_path = TempDir::new().unwrap();
let path = temp_path.path().to_path_buf();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let key = "foo";
let value = "bar";
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
let (tx, mut rx) = channel(5);
// now let us spawn a few readers
for _ in 0..5 {
let p = path.clone();
let tx = tx.clone();
std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let sled_settings = RocksBackendSettings {
db_path: p,
read_only: true,
column_family: None,
};
let key = "foo";
let mut db: RocksBackend<NoStorageSerde> =
RocksBackend::new(sled_settings).unwrap();
while db.load(key.as_bytes()).await.unwrap().is_none() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
tx.send(()).await.unwrap();
});
});
}
db.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
let mut recvs = 0;
loop {
if rx.recv().await.is_some() {
recvs += 1;
if recvs == 5 {
break;
}
}
}
Ok(())
}
}

View File

@ -0,0 +1,58 @@
use rocksdb::{Options, DB};
const TEMP_ROCKS_PATH: &str = "rocks";
pub fn rocksdb_ro() {
let mut opts = Options::default();
opts.create_if_missing(true);
// open in read only mode
let db = DB::open_cf_for_read_only(&opts, TEMP_ROCKS_PATH, ["blocks", "da"], false).unwrap();
let blocks_cf = db.cf_handle("blocks").unwrap();
let r = db.get_cf(blocks_cf, b"block1").unwrap().unwrap();
assert_eq!(r, b"block1data");
let da_cf = db.cf_handle("da").unwrap();
let r = db.get_cf(da_cf, b"da1").unwrap().unwrap();
assert_eq!(r, b"da1data");
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn rocksdb_rw() {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, TEMP_ROCKS_PATH, ["blocks", "da"]).unwrap();
// open blocks column family and insert a block
let blocks_cf = db.cf_handle("blocks").unwrap();
db.put_cf(blocks_cf, b"block1", b"block1data").unwrap();
// open da column family and insert a blob
let da_cf = db.cf_handle("da").unwrap();
db.put_cf(da_cf, b"da1", b"da1data").unwrap();
// A loop to mock a long running program
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
fn main() {
let mut args = std::env::args();
args.next();
let o = args.next();
if o.is_none() {
println!("open in read-write mode");
rocksdb_rw()
} else {
println!("open in read-only mode");
rocksdb_ro()
}
}