lssa/storage/src/sequencer.rs

534 lines
18 KiB
Rust
Raw Normal View History

2026-02-03 11:36:07 +02:00
use std::{path::Path, sync::Arc};
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use nssa::V03State;
2026-02-03 11:36:07 +02:00
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
};
2026-03-25 16:14:33 +02:00
use crate::{
CF_BLOCK_NAME, CF_META_NAME, CF_NSSA_STATE_NAME, DB_META_FIRST_BLOCK_IN_DB_KEY,
DB_META_FIRST_BLOCK_SET_KEY, DB_META_LAST_FINALIZED_BLOCK_ID, DB_META_LATEST_BLOCK_META_KEY,
DB_NSSA_STATE_KEY,
error::DbError,
storable_cell::{SimpleStorableCell, cells::meta_shared::LastBlockCell},
};
2026-01-23 14:09:10 +02:00
pub type DbResult<T> = Result<T, DbError>;
pub struct RocksDBIO {
pub db: DBWithThreadMode<MultiThreaded>,
}
impl RocksDBIO {
pub fn open_or_create(
path: &Path,
2026-03-04 18:42:33 +03:00
genesis_block: &Block,
genesis_msg_id: MantleMsgId,
) -> DbResult<Self> {
2026-01-23 14:09:10 +02:00
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
2026-01-23 14:09:10 +02:00
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_opts,
path,
vec![cfb, cfmeta, cfstate],
2026-03-04 18:42:33 +03:00
)
.map_err(|err| DbError::RocksDbError {
error: err,
additional_info: Some("Failed to open or create DB".to_owned()),
})?;
2026-01-23 14:09:10 +02:00
2026-03-04 18:42:33 +03:00
let dbio = Self { db };
2026-01-23 14:09:10 +02:00
let is_start_set = dbio.get_meta_is_first_block_set()?;
2026-03-09 18:27:56 +03:00
if !is_start_set {
2026-03-04 18:42:33 +03:00
let block_id = genesis_block.header.block_id;
dbio.put_meta_first_block_in_db(genesis_block, genesis_msg_id)?;
2026-01-23 14:09:10 +02:00
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
dbio.put_meta_latest_block_meta(&BlockMeta {
2026-03-04 18:42:33 +03:00
id: genesis_block.header.block_id,
hash: genesis_block.header.hash,
msg_id: genesis_msg_id,
})?;
2026-01-23 14:09:10 +02:00
}
2026-03-09 18:27:56 +03:00
Ok(dbio)
2026-01-23 14:09:10 +02:00
}
pub fn destroy(path: &Path) -> DbResult<()> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
2026-01-23 14:09:10 +02:00
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
DBWithThreadMode::<MultiThreaded>::destroy(&db_opts, path)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))
}
pub fn meta_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_META_NAME).unwrap()
}
pub fn block_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
}
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
2026-01-23 14:09:10 +02:00
}
2026-03-25 16:14:33 +02:00
// Generics
fn get<T: SimpleStorableCell>(&self) -> DbResult<T> {
T::get(&self.db)
}
#[expect(unused, reason = "Unused")]
fn get_opt<T: SimpleStorableCell>(&self) -> DbResult<Option<T>> {
T::get_opt(&self.db)
}
fn put<T: SimpleStorableCell>(&self, cell: &T) -> DbResult<()> {
cell.put(&self.db)
}
fn put_batch<T: SimpleStorableCell>(
&self,
cell: &T,
write_batch: &mut WriteBatch,
) -> DbResult<()> {
cell.put_batch(&self.db, write_batch)
}
2026-01-23 14:09:10 +02:00
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to deserialize first block".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?)
} else {
Err(DbError::db_interaction_error(
"First block not found".to_owned(),
))
2026-01-23 14:09:10 +02:00
}
}
pub fn get_meta_last_block_in_db(&self) -> DbResult<u64> {
2026-03-25 16:14:33 +02:00
self.get::<LastBlockCell>().map(|cell| cell.0)
2026-01-23 14:09:10 +02:00
}
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(res.is_some())
}
pub fn put_nssa_state_in_db(&self, state: &V03State, batch: &mut WriteBatch) -> DbResult<()> {
let cf_nssa_state = self.nssa_state_column();
batch.put_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_NSSA_STATE_KEY".to_owned()),
)
})?,
borsh::to_vec(state).map_err(|err| {
2026-03-04 18:42:33 +03:00
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_owned()))
})?,
);
Ok(())
}
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
2026-01-23 14:09:10 +02:00
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_FIRST_BLOCK_IN_DB_KEY".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
borsh::to_vec(&block.header.block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize first block id".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, true, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
2026-03-04 18:42:33 +03:00
Some("Failed to write first block in db".to_owned()),
)
})?;
2026-01-23 14:09:10 +02:00
Ok(())
}
pub fn put_meta_last_block_in_db(&self, block_id: u64) -> DbResult<()> {
2026-03-25 16:14:33 +02:00
self.put(&LastBlockCell(block_id))
2026-01-23 14:09:10 +02:00
}
fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
batch: &mut WriteBatch,
) -> DbResult<()> {
2026-03-25 16:14:33 +02:00
self.put_batch(&LastBlockCell(block_id), batch)
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_FINALIZED_BLOCK_ID).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_LAST_FINALIZED_BLOCK_ID".to_owned()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize last block id".to_owned()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
2026-01-23 14:09:10 +02:00
pub fn put_meta_is_first_block_set(&self) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_FIRST_BLOCK_SET_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_FIRST_BLOCK_SET_KEY".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
2026-03-04 18:42:33 +03:00
[1_u8; 1],
2026-01-23 14:09:10 +02:00
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_owned()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize latest block meta".to_owned()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta_batch(
&self,
block_meta: &BlockMeta,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_owned()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize latest block meta".to_owned()),
)
})?,
);
Ok(())
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_owned()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to deserialize latest block meta".to_owned()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Latest block meta not found".to_owned(),
))
}
}
pub fn put_block(
&self,
block: &Block,
msg_id: MantleMsgId,
first: bool,
batch: &mut WriteBatch,
) -> DbResult<()> {
2026-01-23 14:09:10 +02:00
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
self.put_meta_latest_block_meta_batch(
&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
},
batch,
)?;
2026-01-23 14:09:10 +02:00
}
}
batch.put_cf(
&cf_block,
borsh::to_vec(&block.header.block_id).map_err(|err| {
2026-03-04 18:42:33 +03:00
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_owned()))
})?,
borsh::to_vec(block).map_err(|err| {
2026-03-04 18:42:33 +03:00
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_owned()))
})?,
);
2026-01-23 14:09:10 +02:00
Ok(())
}
pub fn get_block(&self, block_id: u64) -> DbResult<Option<Block>> {
2026-01-23 14:09:10 +02:00
let cf_block = self.block_column();
let res = self
.db
.get_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize block id".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(Some(borsh::from_slice::<Block>(&data).map_err(|serr| {
DbError::borsh_cast_message(
serr,
2026-03-04 18:42:33 +03:00
Some("Failed to deserialize block data".to_owned()),
)
})?))
2026-01-23 14:09:10 +02:00
} else {
Ok(None)
2026-01-23 14:09:10 +02:00
}
}
pub fn get_nssa_state(&self) -> DbResult<V03State> {
let cf_nssa_state = self.nssa_state_column();
2026-01-23 14:09:10 +02:00
let res = self
.db
.get_cf(
&cf_nssa_state,
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
2026-01-23 14:09:10 +02:00
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize block id".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<V03State>(&data).map_err(|serr| {
2026-01-23 14:09:10 +02:00
DbError::borsh_cast_message(
serr,
2026-03-04 18:42:33 +03:00
Some("Failed to deserialize block data".to_owned()),
2026-01-23 14:09:10 +02:00
)
})?)
} else {
Err(DbError::db_interaction_error(
"NSSA state not found".to_owned(),
))
2026-01-23 14:09:10 +02:00
}
}
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
let cf_block = self.block_column();
let key = borsh::to_vec(&block_id).map_err(|err| {
2026-03-04 18:42:33 +03:00
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_owned()))
})?;
if self
.db
.get_cf(&cf_block, &key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
.is_none()
{
return Err(DbError::db_interaction_error(format!(
"Block with id {block_id} not found"
)));
}
self.db
.delete_cf(&cf_block, key)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?.ok_or_else(|| {
DbError::db_interaction_error(format!("Block with id {block_id} not found"))
})?;
block.bedrock_status = BedrockStatus::Finalized;
let cf_block = self.block_column();
self.db
.put_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize block id".to_owned()),
)
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to serialize block data".to_owned()),
)
})?,
)
.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to mark block {block_id} as finalized")),
)
})?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
.map(|res| {
let (_key, value) = res.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
2026-03-04 18:42:33 +03:00
Some("Failed to get key value pair".to_owned()),
)
})?;
borsh::from_slice::<Block>(&value).map_err(|err| {
DbError::borsh_cast_message(
err,
2026-03-04 18:42:33 +03:00
Some("Failed to deserialize block data".to_owned()),
)
})
})
}
pub fn atomic_update(
&self,
block: &Block,
msg_id: MantleMsgId,
state: &V03State,
) -> DbResult<()> {
let block_id = block.header.block_id;
let mut batch = WriteBatch::default();
self.put_block(block, msg_id, false, &mut batch)?;
self.put_nssa_state_in_db(state, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to udpate db with block {block_id}")),
)
})
}
2026-02-04 14:57:38 +02:00
}