mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-02-14 10:23:08 +00:00
Merge pull request #302 from logos-blockchain/schouhy/sequencer-retries-pending-blocks
Add pending block resubmit loop to the sequencer
This commit is contained in:
commit
29ad41a645
432
Cargo.lock
generated
432
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -17,6 +17,8 @@ pub struct BackoffConfig {
|
||||
|
||||
// Simple wrapper
|
||||
// maybe extend in the future for our purposes
|
||||
// `Clone` is cheap because `CommonHttpClient` is internally reference counted (`Arc`).
|
||||
#[derive(Clone)]
|
||||
pub struct BedrockClient {
|
||||
http_client: CommonHttpClient,
|
||||
node_url: Url,
|
||||
|
||||
@ -4,6 +4,7 @@ use sha2::{Digest, Sha256, digest::FixedOutput};
|
||||
use crate::transaction::EncodedTransaction;
|
||||
|
||||
pub type HashType = [u8; 32];
|
||||
pub type MantleMsgId = [u8; 32];
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
/// Our own hasher.
|
||||
@ -49,6 +50,7 @@ pub struct Block {
|
||||
pub header: BlockHeader,
|
||||
pub body: BlockBody,
|
||||
pub bedrock_status: BedrockStatus,
|
||||
pub bedrock_parent_id: MantleMsgId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
|
||||
@ -60,7 +62,11 @@ pub struct HashableBlockData {
|
||||
}
|
||||
|
||||
impl HashableBlockData {
|
||||
pub fn into_pending_block(self, signing_key: &nssa::PrivateKey) -> Block {
|
||||
pub fn into_pending_block(
|
||||
self,
|
||||
signing_key: &nssa::PrivateKey,
|
||||
bedrock_parent_id: MantleMsgId,
|
||||
) -> Block {
|
||||
let data_bytes = borsh::to_vec(&self).unwrap();
|
||||
let signature = nssa::Signature::new(signing_key, &data_bytes);
|
||||
let hash = OwnHasher::hash(&data_bytes);
|
||||
@ -76,6 +82,7 @@ impl HashableBlockData {
|
||||
transactions: self.transactions,
|
||||
},
|
||||
bedrock_status: BedrockStatus::Pending,
|
||||
bedrock_parent_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -30,7 +30,7 @@ pub fn produce_dummy_block(
|
||||
transactions,
|
||||
};
|
||||
|
||||
block_data.into_pending_block(&sequencer_sign_key_for_testing())
|
||||
block_data.into_pending_block(&sequencer_sign_key_for_testing(), [0; 32])
|
||||
}
|
||||
|
||||
pub fn produce_dummy_empty_transaction() -> EncodedTransaction {
|
||||
|
||||
@ -599,12 +599,14 @@ impl TryFrom<common::block::Block> for Block {
|
||||
header,
|
||||
body,
|
||||
bedrock_status,
|
||||
bedrock_parent_id,
|
||||
} = value;
|
||||
|
||||
Ok(Self {
|
||||
header: header.into(),
|
||||
body: body.try_into()?,
|
||||
bedrock_status: bedrock_status.into(),
|
||||
bedrock_parent_id: MantleMsgId(bedrock_parent_id),
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -617,12 +619,14 @@ impl TryFrom<Block> for common::block::Block {
|
||||
header,
|
||||
body,
|
||||
bedrock_status,
|
||||
bedrock_parent_id,
|
||||
} = value;
|
||||
|
||||
Ok(Self {
|
||||
header: header.try_into()?,
|
||||
body: body.try_into()?,
|
||||
bedrock_status: bedrock_status.into(),
|
||||
bedrock_parent_id: bedrock_parent_id.0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -36,6 +36,7 @@ pub struct Block {
|
||||
pub header: BlockHeader,
|
||||
pub body: BlockBody,
|
||||
pub bedrock_status: BedrockStatus,
|
||||
pub bedrock_parent_id: MantleMsgId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
|
||||
@ -188,6 +189,13 @@ pub struct Hash(
|
||||
pub [u8; 32],
|
||||
);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct MantleMsgId(
|
||||
#[serde(with = "base64::arr")]
|
||||
#[schemars(with = "String", description = "base64-encoded Bedrock message id")]
|
||||
pub [u8; 32],
|
||||
);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
|
||||
pub enum BedrockStatus {
|
||||
Pending,
|
||||
|
||||
@ -6,6 +6,7 @@
|
||||
"max_num_tx_in_block": 20,
|
||||
"mempool_max_size": 10000,
|
||||
"block_create_timeout_millis": 10000,
|
||||
"retry_pending_blocks_timeout_millis": 240000,
|
||||
"port": 0,
|
||||
"initial_accounts": [
|
||||
{
|
||||
|
||||
@ -40,6 +40,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
|
||||
pub struct TestContext {
|
||||
sequencer_server_handle: ServerHandle,
|
||||
sequencer_loop_handle: JoinHandle<Result<()>>,
|
||||
sequencer_retry_pending_blocks_handle: JoinHandle<Result<()>>,
|
||||
indexer_loop_handle: Option<JoinHandle<Result<()>>>,
|
||||
sequencer_client: SequencerClient,
|
||||
wallet: WalletCore,
|
||||
@ -94,10 +95,15 @@ impl TestContext {
|
||||
|
||||
debug!("Test context setup");
|
||||
|
||||
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir) =
|
||||
Self::setup_sequencer(sequencer_config)
|
||||
.await
|
||||
.context("Failed to setup sequencer")?;
|
||||
let (
|
||||
sequencer_server_handle,
|
||||
sequencer_addr,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
temp_sequencer_dir,
|
||||
) = Self::setup_sequencer(sequencer_config)
|
||||
.await
|
||||
.context("Failed to setup sequencer")?;
|
||||
|
||||
// Convert 0.0.0.0 to 127.0.0.1 for client connections
|
||||
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
|
||||
@ -130,6 +136,7 @@ impl TestContext {
|
||||
Ok(Self {
|
||||
sequencer_server_handle,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
indexer_loop_handle,
|
||||
sequencer_client,
|
||||
wallet,
|
||||
@ -140,6 +147,7 @@ impl TestContext {
|
||||
Ok(Self {
|
||||
sequencer_server_handle,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
indexer_loop_handle: None,
|
||||
sequencer_client,
|
||||
wallet,
|
||||
@ -151,7 +159,13 @@ impl TestContext {
|
||||
|
||||
async fn setup_sequencer(
|
||||
mut config: SequencerConfig,
|
||||
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>, TempDir)> {
|
||||
) -> Result<(
|
||||
ServerHandle,
|
||||
SocketAddr,
|
||||
JoinHandle<Result<()>>,
|
||||
JoinHandle<Result<()>>,
|
||||
TempDir,
|
||||
)> {
|
||||
let temp_sequencer_dir =
|
||||
tempfile::tempdir().context("Failed to create temp dir for sequencer home")?;
|
||||
|
||||
@ -163,13 +177,18 @@ impl TestContext {
|
||||
// Setting port to 0 lets the OS choose a free port for us
|
||||
config.port = 0;
|
||||
|
||||
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle) =
|
||||
sequencer_runner::startup_sequencer(config).await?;
|
||||
let (
|
||||
sequencer_server_handle,
|
||||
sequencer_addr,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
) = sequencer_runner::startup_sequencer(config).await?;
|
||||
|
||||
Ok((
|
||||
sequencer_server_handle,
|
||||
sequencer_addr,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
temp_sequencer_dir,
|
||||
))
|
||||
}
|
||||
@ -230,6 +249,7 @@ impl Drop for TestContext {
|
||||
let Self {
|
||||
sequencer_server_handle,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
indexer_loop_handle,
|
||||
sequencer_client: _,
|
||||
wallet: _,
|
||||
@ -238,6 +258,7 @@ impl Drop for TestContext {
|
||||
} = self;
|
||||
|
||||
sequencer_loop_handle.abort();
|
||||
sequencer_retry_pending_blocks_handle.abort();
|
||||
if let Some(indexer_loop_handle) = indexer_loop_handle {
|
||||
indexer_loop_handle.abort();
|
||||
}
|
||||
|
||||
@ -190,6 +190,7 @@ impl TpsTestManager {
|
||||
initial_commitments: vec![initial_commitment],
|
||||
signing_key: [37; 32],
|
||||
bedrock_config: None,
|
||||
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,7 +5,10 @@ use serde::{Deserialize, Serialize};
|
||||
use crate::{NullifierPublicKey, account::Account};
|
||||
|
||||
#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
|
||||
#[cfg_attr(any(feature = "host", test), derive(Debug, Clone, PartialEq, Eq, Hash))]
|
||||
#[cfg_attr(
|
||||
any(feature = "host", test),
|
||||
derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)
|
||||
)]
|
||||
pub struct Commitment(pub(super) [u8; 32]);
|
||||
|
||||
/// A commitment to all zero data.
|
||||
|
||||
@ -42,7 +42,10 @@ impl From<&NullifierSecretKey> for NullifierPublicKey {
|
||||
pub type NullifierSecretKey = [u8; 32];
|
||||
|
||||
#[derive(Serialize, Deserialize, BorshSerialize, BorshDeserialize)]
|
||||
#[cfg_attr(any(feature = "host", test), derive(Debug, Clone, PartialEq, Eq, Hash))]
|
||||
#[cfg_attr(
|
||||
any(feature = "host", test),
|
||||
derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)
|
||||
)]
|
||||
pub struct Nullifier(pub(super) [u8; 32]);
|
||||
|
||||
impl Nullifier {
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use borsh::{BorshDeserialize, BorshSerialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
mod default_values;
|
||||
@ -20,6 +21,7 @@ fn hash_value(value: &Value) -> Node {
|
||||
}
|
||||
|
||||
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
|
||||
#[derive(BorshSerialize, BorshDeserialize)]
|
||||
pub struct MerkleTree {
|
||||
nodes: Vec<Node>,
|
||||
capacity: usize,
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use borsh::{BorshDeserialize, BorshSerialize};
|
||||
use nssa_core::{
|
||||
account::AccountWithMetadata,
|
||||
program::{InstructionData, ProgramId, ProgramOutput},
|
||||
@ -14,7 +15,7 @@ use crate::{
|
||||
/// TODO: Make this variable when fees are implemented
|
||||
const MAX_NUM_CYCLES_PUBLIC_EXECUTION: u64 = 1024 * 1024 * 32; // 32M cycles
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
|
||||
pub struct Program {
|
||||
id: ProgramId,
|
||||
elf: Vec<u8>,
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
|
||||
use borsh::{BorshDeserialize, BorshSerialize};
|
||||
use nssa_core::{
|
||||
Commitment, CommitmentSetDigest, DUMMY_COMMITMENT, MembershipProof, Nullifier,
|
||||
account::{Account, AccountId},
|
||||
@ -15,6 +16,8 @@ use crate::{
|
||||
|
||||
pub const MAX_NUMBER_CHAINED_CALLS: usize = 10;
|
||||
|
||||
#[derive(BorshSerialize, BorshDeserialize)]
|
||||
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
|
||||
pub(crate) struct CommitmentSet {
|
||||
merkle_tree: MerkleTree,
|
||||
commitments: HashMap<Commitment, usize>,
|
||||
@ -60,8 +63,49 @@ impl CommitmentSet {
|
||||
}
|
||||
}
|
||||
|
||||
type NullifierSet = HashSet<Nullifier>;
|
||||
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
|
||||
struct NullifierSet(BTreeSet<Nullifier>);
|
||||
|
||||
impl NullifierSet {
|
||||
fn new() -> Self {
|
||||
Self(BTreeSet::new())
|
||||
}
|
||||
|
||||
fn extend(&mut self, new_nullifiers: Vec<Nullifier>) {
|
||||
self.0.extend(new_nullifiers);
|
||||
}
|
||||
|
||||
fn contains(&self, nullifier: &Nullifier) -> bool {
|
||||
self.0.contains(nullifier)
|
||||
}
|
||||
}
|
||||
|
||||
impl BorshSerialize for NullifierSet {
|
||||
fn serialize<W: std::io::Write>(&self, writer: &mut W) -> std::io::Result<()> {
|
||||
self.0.iter().collect::<Vec<_>>().serialize(writer)
|
||||
}
|
||||
}
|
||||
|
||||
impl BorshDeserialize for NullifierSet {
|
||||
fn deserialize_reader<R: std::io::Read>(reader: &mut R) -> std::io::Result<Self> {
|
||||
let vec = Vec::<Nullifier>::deserialize_reader(reader)?;
|
||||
|
||||
let mut set = BTreeSet::new();
|
||||
for n in vec {
|
||||
if !set.insert(n) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"duplicate nullifier in NullifierSet",
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self(set))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(BorshSerialize, BorshDeserialize)]
|
||||
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
|
||||
pub struct V02State {
|
||||
public_state: HashMap<AccountId, Account>,
|
||||
private_state: (CommitmentSet, NullifierSet),
|
||||
@ -4528,4 +4572,15 @@ pub mod tests {
|
||||
// Assert - should fail because the malicious program tries to manipulate is_authorized
|
||||
assert!(matches!(result, Err(NssaError::CircuitProvingError(_))));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_state_serialization_roundtrip() {
|
||||
let account_id_1 = AccountId::new([1; 32]);
|
||||
let account_id_2 = AccountId::new([2; 32]);
|
||||
let initial_data = [(account_id_1, 100u128), (account_id_2, 151u128)];
|
||||
let state = V02State::new_with_genesis_accounts(&initial_data, &[]).with_test_programs();
|
||||
let bytes = borsh::to_vec(&state).unwrap();
|
||||
let state_from_bytes: V02State = borsh::from_slice(&bytes).unwrap();
|
||||
assert_eq!(state, state_from_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@ use std::{fs, path::Path, str::FromStr};
|
||||
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use bedrock_client::BedrockClient;
|
||||
use common::block::HashableBlockData;
|
||||
use common::block::Block;
|
||||
use logos_blockchain_core::mantle::{
|
||||
MantleTx, Op, OpProof, SignedMantleTx, Transaction, TxHash, ledger,
|
||||
ops::channel::{ChannelId, MsgId, inscribe::InscriptionOp},
|
||||
@ -15,11 +15,11 @@ use reqwest::Url;
|
||||
use crate::config::BedrockConfig;
|
||||
|
||||
/// A component that posts block data to logos blockchain
|
||||
#[derive(Clone)]
|
||||
pub struct BlockSettlementClient {
|
||||
bedrock_client: BedrockClient,
|
||||
bedrock_signing_key: Ed25519Key,
|
||||
bedrock_channel_id: ChannelId,
|
||||
last_message_id: MsgId,
|
||||
}
|
||||
|
||||
impl BlockSettlementClient {
|
||||
@ -30,25 +30,24 @@ impl BlockSettlementClient {
|
||||
.context("Bedrock node address is not a valid url")?;
|
||||
let bedrock_client =
|
||||
BedrockClient::new(None, bedrock_url).context("Failed to initialize bedrock client")?;
|
||||
let channel_genesis_msg = MsgId::from([0; 32]);
|
||||
Ok(Self {
|
||||
bedrock_client,
|
||||
bedrock_signing_key,
|
||||
bedrock_channel_id: config.channel_id,
|
||||
last_message_id: channel_genesis_msg,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create and sign a transaction for inscribing data
|
||||
pub fn create_inscribe_tx(&self, data: Vec<u8>) -> (SignedMantleTx, MsgId) {
|
||||
pub fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> {
|
||||
let inscription_data = borsh::to_vec(block)?;
|
||||
let verifying_key_bytes = self.bedrock_signing_key.public_key().to_bytes();
|
||||
let verifying_key =
|
||||
Ed25519PublicKey::from_bytes(&verifying_key_bytes).expect("valid ed25519 public key");
|
||||
|
||||
let inscribe_op = InscriptionOp {
|
||||
channel_id: self.bedrock_channel_id,
|
||||
inscription: data,
|
||||
parent: self.last_message_id,
|
||||
inscription: inscription_data,
|
||||
parent: block.bedrock_parent_id.into(),
|
||||
signer: verifying_key,
|
||||
};
|
||||
let inscribe_op_id = inscribe_op.id();
|
||||
@ -78,20 +77,17 @@ impl BlockSettlementClient {
|
||||
ledger_tx_proof: empty_ledger_signature(&tx_hash),
|
||||
mantle_tx: inscribe_tx,
|
||||
};
|
||||
(signed_mantle_tx, inscribe_op_id)
|
||||
Ok((signed_mantle_tx, inscribe_op_id))
|
||||
}
|
||||
|
||||
/// Post a transaction to the node and wait for inclusion
|
||||
pub async fn post_and_wait(&mut self, block_data: &HashableBlockData) -> Result<u64> {
|
||||
let inscription_data = borsh::to_vec(&block_data)?;
|
||||
let (tx, new_msg_id) = self.create_inscribe_tx(inscription_data);
|
||||
/// Post a transaction to the node
|
||||
pub async fn submit_block_to_bedrock(&self, block: &Block) -> Result<MsgId> {
|
||||
let (tx, new_msg_id) = self.create_inscribe_tx(block)?;
|
||||
|
||||
// Post the transaction
|
||||
self.bedrock_client.post_transaction(tx).await?;
|
||||
|
||||
self.last_message_id = new_msg_id;
|
||||
|
||||
Ok(block_data.block_id)
|
||||
Ok(new_msg_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -2,9 +2,10 @@ use std::{collections::HashMap, path::Path};
|
||||
|
||||
use anyhow::Result;
|
||||
use common::{HashType, block::Block, transaction::EncodedTransaction};
|
||||
use nssa::V02State;
|
||||
use storage::RocksDBIO;
|
||||
|
||||
pub struct SequencerBlockStore {
|
||||
pub struct SequencerStore {
|
||||
dbio: RocksDBIO,
|
||||
// TODO: Consider adding the hashmap to the database for faster recovery.
|
||||
tx_hash_to_block_map: HashMap<HashType, u64>,
|
||||
@ -12,7 +13,7 @@ pub struct SequencerBlockStore {
|
||||
signing_key: nssa::PrivateKey,
|
||||
}
|
||||
|
||||
impl SequencerBlockStore {
|
||||
impl SequencerStore {
|
||||
/// Starting database at the start of new chain.
|
||||
/// Creates files if necessary.
|
||||
///
|
||||
@ -42,18 +43,15 @@ impl SequencerBlockStore {
|
||||
|
||||
/// Reopening existing database
|
||||
pub fn open_db_restart(location: &Path, signing_key: nssa::PrivateKey) -> Result<Self> {
|
||||
SequencerBlockStore::open_db_with_genesis(location, None, signing_key)
|
||||
SequencerStore::open_db_with_genesis(location, None, signing_key)
|
||||
}
|
||||
|
||||
pub fn get_block_at_id(&self, id: u64) -> Result<Block> {
|
||||
Ok(self.dbio.get_block(id)?)
|
||||
}
|
||||
|
||||
pub fn put_block_at_id(&mut self, block: Block) -> Result<()> {
|
||||
let new_transactions_map = block_to_transactions_map(&block);
|
||||
self.dbio.put_block(block, false)?;
|
||||
self.tx_hash_to_block_map.extend(new_transactions_map);
|
||||
Ok(())
|
||||
pub fn delete_block_at_id(&mut self, block_id: u64) -> Result<()> {
|
||||
Ok(self.dbio.delete_block(block_id)?)
|
||||
}
|
||||
|
||||
/// Returns the transaction corresponding to the given hash, if it exists in the blockchain.
|
||||
@ -81,6 +79,21 @@ impl SequencerBlockStore {
|
||||
pub fn signing_key(&self) -> &nssa::PrivateKey {
|
||||
&self.signing_key
|
||||
}
|
||||
|
||||
pub fn get_all_blocks(&self) -> impl Iterator<Item = Result<Block>> {
|
||||
self.dbio.get_all_blocks().map(|res| Ok(res?))
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, block: Block, state: &V02State) -> Result<()> {
|
||||
let new_transactions_map = block_to_transactions_map(&block);
|
||||
self.dbio.atomic_update(block, state)?;
|
||||
self.tx_hash_to_block_map.extend(new_transactions_map);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_nssa_state(&self) -> Option<V02State> {
|
||||
self.dbio.get_nssa_state().ok()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn block_to_transactions_map(block: &Block) -> HashMap<HashType, u64> {
|
||||
@ -113,11 +126,10 @@ mod tests {
|
||||
transactions: vec![],
|
||||
};
|
||||
|
||||
let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key);
|
||||
let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
|
||||
// Start an empty node store
|
||||
let mut node_store =
|
||||
SequencerBlockStore::open_db_with_genesis(path, Some(genesis_block), signing_key)
|
||||
.unwrap();
|
||||
SequencerStore::open_db_with_genesis(path, Some(genesis_block), signing_key).unwrap();
|
||||
|
||||
let tx = common::test_utils::produce_dummy_empty_transaction();
|
||||
let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
|
||||
@ -126,7 +138,8 @@ mod tests {
|
||||
let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
|
||||
assert_eq!(None, retrieved_tx);
|
||||
// Add the block with the transaction
|
||||
node_store.put_block_at_id(block).unwrap();
|
||||
let dummy_state = V02State::new_with_genesis_accounts(&[], &[]);
|
||||
node_store.update(block, &dummy_state).unwrap();
|
||||
// Try again
|
||||
let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
|
||||
assert_eq!(Some(tx), retrieved_tx);
|
||||
|
||||
@ -41,6 +41,8 @@ pub struct SequencerConfig {
|
||||
pub mempool_max_size: usize,
|
||||
/// Interval in which blocks produced
|
||||
pub block_create_timeout_millis: u64,
|
||||
/// Interval in which pending blocks are retried
|
||||
pub retry_pending_blocks_timeout_millis: u64,
|
||||
/// Port to listen
|
||||
pub port: u16,
|
||||
/// List of initial accounts data
|
||||
|
||||
@ -5,15 +5,15 @@ use anyhow::Result;
|
||||
use common::PINATA_BASE58;
|
||||
use common::{
|
||||
HashType,
|
||||
block::HashableBlockData,
|
||||
block::{BedrockStatus, Block, HashableBlockData, MantleMsgId},
|
||||
transaction::{EncodedTransaction, NSSATransaction},
|
||||
};
|
||||
use config::SequencerConfig;
|
||||
use log::warn;
|
||||
use log::{info, warn};
|
||||
use mempool::{MemPool, MemPoolHandle};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerBlockStore};
|
||||
use crate::{block_settlement_client::BlockSettlementClient, block_store::SequencerStore};
|
||||
|
||||
mod block_settlement_client;
|
||||
pub mod block_store;
|
||||
@ -21,11 +21,12 @@ pub mod config;
|
||||
|
||||
pub struct SequencerCore {
|
||||
state: nssa::V02State,
|
||||
block_store: SequencerBlockStore,
|
||||
store: SequencerStore,
|
||||
mempool: MemPool<EncodedTransaction>,
|
||||
sequencer_config: SequencerConfig,
|
||||
chain_height: u64,
|
||||
block_settlement_client: Option<BlockSettlementClient>,
|
||||
last_bedrock_msg_id: MantleMsgId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
@ -43,7 +44,11 @@ impl Display for TransactionMalformationError {
|
||||
impl std::error::Error for TransactionMalformationError {}
|
||||
|
||||
impl SequencerCore {
|
||||
/// Start Sequencer from configuration and construct transaction sender
|
||||
/// Starts the sequencer using the provided configuration.
|
||||
/// If an existing database is found, the sequencer state is loaded from it and
|
||||
/// assumed to represent the correct latest state consistent with Bedrock-finalized data.
|
||||
/// If no database is found, the sequencer performs a fresh start from genesis,
|
||||
/// initializing its state with the accounts defined in the configuration file.
|
||||
pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle<EncodedTransaction>) {
|
||||
let hashable_data = HashableBlockData {
|
||||
block_id: config.genesis_id,
|
||||
@ -53,37 +58,51 @@ impl SequencerCore {
|
||||
};
|
||||
|
||||
let signing_key = nssa::PrivateKey::try_new(config.signing_key).unwrap();
|
||||
let genesis_block = hashable_data.into_pending_block(&signing_key);
|
||||
let channel_genesis_msg_id = [0; 32];
|
||||
let genesis_block = hashable_data.into_pending_block(&signing_key, channel_genesis_msg_id);
|
||||
|
||||
// Sequencer should panic if unable to open db,
|
||||
// as fixing this issue may require actions non-native to program scope
|
||||
let block_store = SequencerBlockStore::open_db_with_genesis(
|
||||
let store = SequencerStore::open_db_with_genesis(
|
||||
&config.home.join("rocksdb"),
|
||||
Some(genesis_block),
|
||||
signing_key,
|
||||
)
|
||||
.unwrap();
|
||||
let mut initial_commitments = vec![];
|
||||
|
||||
for init_comm_data in config.initial_commitments.clone() {
|
||||
let npk = init_comm_data.npk;
|
||||
let mut state = match store.get_nssa_state() {
|
||||
Some(state) => {
|
||||
info!("Found local database. Loading state and pending blocks from it.");
|
||||
state
|
||||
}
|
||||
None => {
|
||||
info!(
|
||||
"No database found when starting the sequencer. Creating a fresh new with the initial data in config"
|
||||
);
|
||||
let initial_commitments: Vec<nssa_core::Commitment> = config
|
||||
.initial_commitments
|
||||
.iter()
|
||||
.map(|init_comm_data| {
|
||||
let npk = &init_comm_data.npk;
|
||||
|
||||
let mut acc = init_comm_data.account;
|
||||
let mut acc = init_comm_data.account.clone();
|
||||
|
||||
acc.program_owner = nssa::program::Program::authenticated_transfer_program().id();
|
||||
acc.program_owner =
|
||||
nssa::program::Program::authenticated_transfer_program().id();
|
||||
|
||||
let comm = nssa_core::Commitment::new(&npk, &acc);
|
||||
nssa_core::Commitment::new(npk, &acc)
|
||||
})
|
||||
.collect();
|
||||
|
||||
initial_commitments.push(comm);
|
||||
}
|
||||
let init_accs: Vec<(nssa::AccountId, u128)> = config
|
||||
.initial_accounts
|
||||
.iter()
|
||||
.map(|acc_data| (acc_data.account_id.parse().unwrap(), acc_data.balance))
|
||||
.collect();
|
||||
|
||||
let init_accs: Vec<(nssa::AccountId, u128)> = config
|
||||
.initial_accounts
|
||||
.iter()
|
||||
.map(|acc_data| (acc_data.account_id.parse().unwrap(), acc_data.balance))
|
||||
.collect();
|
||||
|
||||
let mut state = nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments);
|
||||
nssa::V02State::new_with_genesis_accounts(&init_accs, &initial_commitments)
|
||||
}
|
||||
};
|
||||
|
||||
#[cfg(feature = "testnet")]
|
||||
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
|
||||
@ -94,37 +113,17 @@ impl SequencerCore {
|
||||
.expect("Block settlement client should be constructible")
|
||||
});
|
||||
|
||||
let mut this = Self {
|
||||
let sequencer_core = Self {
|
||||
state,
|
||||
block_store,
|
||||
store,
|
||||
mempool,
|
||||
chain_height: config.genesis_id,
|
||||
sequencer_config: config,
|
||||
block_settlement_client,
|
||||
last_bedrock_msg_id: channel_genesis_msg_id,
|
||||
};
|
||||
|
||||
this.sync_state_with_stored_blocks();
|
||||
|
||||
(this, mempool_handle)
|
||||
}
|
||||
|
||||
/// If there are stored blocks ahead of the current height, this method will load and process
|
||||
/// all transaction in them in the order they are stored. The NSSA state will be updated
|
||||
/// accordingly.
|
||||
fn sync_state_with_stored_blocks(&mut self) {
|
||||
let mut next_block_id = self.sequencer_config.genesis_id + 1;
|
||||
while let Ok(block) = self.block_store.get_block_at_id(next_block_id) {
|
||||
for encoded_transaction in block.body.transactions {
|
||||
let transaction = NSSATransaction::try_from(&encoded_transaction).unwrap();
|
||||
// Process transaction and update state
|
||||
self.execute_check_transaction_on_state(transaction)
|
||||
.unwrap();
|
||||
// Update the tx hash to block id map.
|
||||
self.block_store.insert(&encoded_transaction, next_block_id);
|
||||
}
|
||||
self.chain_height = next_block_id;
|
||||
next_block_id += 1;
|
||||
}
|
||||
(sequencer_core, mempool_handle)
|
||||
}
|
||||
|
||||
fn execute_check_transaction_on_state(
|
||||
@ -148,8 +147,11 @@ impl SequencerCore {
|
||||
pub async fn produce_new_block_and_post_to_settlement_layer(&mut self) -> Result<u64> {
|
||||
let block_data = self.produce_new_block_with_mempool_transactions()?;
|
||||
|
||||
if let Some(block_settlement) = self.block_settlement_client.as_mut() {
|
||||
block_settlement.post_and_wait(&block_data).await?;
|
||||
if let Some(client) = self.block_settlement_client.as_mut() {
|
||||
let block =
|
||||
block_data.into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id);
|
||||
let msg_id = client.submit_block_to_bedrock(&block).await?;
|
||||
self.last_bedrock_msg_id = msg_id.into();
|
||||
log::info!("Posted block data to Bedrock");
|
||||
}
|
||||
|
||||
@ -179,11 +181,7 @@ impl SequencerCore {
|
||||
}
|
||||
}
|
||||
|
||||
let prev_block_hash = self
|
||||
.block_store
|
||||
.get_block_at_id(self.chain_height)?
|
||||
.header
|
||||
.hash;
|
||||
let prev_block_hash = self.store.get_block_at_id(self.chain_height)?.header.hash;
|
||||
|
||||
let curr_time = chrono::Utc::now().timestamp_millis() as u64;
|
||||
|
||||
@ -196,9 +194,9 @@ impl SequencerCore {
|
||||
|
||||
let block = hashable_data
|
||||
.clone()
|
||||
.into_pending_block(self.block_store.signing_key());
|
||||
.into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id);
|
||||
|
||||
self.block_store.put_block_at_id(block)?;
|
||||
self.store.update(block, &self.state)?;
|
||||
|
||||
self.chain_height = new_block_height;
|
||||
|
||||
@ -224,8 +222,8 @@ impl SequencerCore {
|
||||
&self.state
|
||||
}
|
||||
|
||||
pub fn block_store(&self) -> &SequencerBlockStore {
|
||||
&self.block_store
|
||||
pub fn block_store(&self) -> &SequencerStore {
|
||||
&self.store
|
||||
}
|
||||
|
||||
pub fn chain_height(&self) -> u64 {
|
||||
@ -235,6 +233,39 @@ impl SequencerCore {
|
||||
pub fn sequencer_config(&self) -> &SequencerConfig {
|
||||
&self.sequencer_config
|
||||
}
|
||||
|
||||
/// Deletes finalized blocks from the sequencer's pending block list.
|
||||
/// This method must be called when new blocks are finalized on Bedrock.
|
||||
/// All pending blocks with an ID less than or equal to `last_finalized_block_id`
|
||||
/// are removed from the database.
|
||||
pub fn clean_finalized_blocks_from_db(&mut self, last_finalized_block_id: u64) -> Result<()> {
|
||||
if let Some(first_pending_block_id) = self
|
||||
.get_pending_blocks()?
|
||||
.iter()
|
||||
.map(|block| block.header.block_id)
|
||||
.min()
|
||||
{
|
||||
(first_pending_block_id..=last_finalized_block_id)
|
||||
.try_for_each(|id| self.store.delete_block_at_id(id))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the list of stored pending blocks.
|
||||
pub fn get_pending_blocks(&self) -> Result<Vec<Block>> {
|
||||
Ok(self
|
||||
.store
|
||||
.get_all_blocks()
|
||||
.collect::<Result<Vec<Block>>>()?
|
||||
.into_iter()
|
||||
.filter(|block| matches!(block.bedrock_status, BedrockStatus::Pending))
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn block_settlement_client(&self) -> Option<BlockSettlementClient> {
|
||||
self.block_settlement_client.clone()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
|
||||
@ -297,6 +328,7 @@ mod tests {
|
||||
initial_commitments: vec![],
|
||||
signing_key: *sequencer_sign_key_for_testing().value(),
|
||||
bedrock_config: None,
|
||||
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
|
||||
}
|
||||
}
|
||||
|
||||
@ -680,10 +712,7 @@ mod tests {
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap()
|
||||
.block_id;
|
||||
let block = sequencer
|
||||
.block_store
|
||||
.get_block_at_id(current_height)
|
||||
.unwrap();
|
||||
let block = sequencer.store.get_block_at_id(current_height).unwrap();
|
||||
|
||||
// Only one should be included in the block
|
||||
assert_eq!(block.body.transactions, vec![tx.clone()]);
|
||||
@ -720,10 +749,7 @@ mod tests {
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap()
|
||||
.block_id;
|
||||
let block = sequencer
|
||||
.block_store
|
||||
.get_block_at_id(current_height)
|
||||
.unwrap();
|
||||
let block = sequencer.store.get_block_at_id(current_height).unwrap();
|
||||
assert_eq!(block.body.transactions, vec![tx.clone()]);
|
||||
|
||||
// Add same transaction should fail
|
||||
@ -732,10 +758,7 @@ mod tests {
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap()
|
||||
.block_id;
|
||||
let block = sequencer
|
||||
.block_store
|
||||
.get_block_at_id(current_height)
|
||||
.unwrap();
|
||||
let block = sequencer.store.get_block_at_id(current_height).unwrap();
|
||||
assert!(block.body.transactions.is_empty());
|
||||
}
|
||||
|
||||
@ -768,10 +791,7 @@ mod tests {
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap()
|
||||
.block_id;
|
||||
let block = sequencer
|
||||
.block_store
|
||||
.get_block_at_id(current_height)
|
||||
.unwrap();
|
||||
let block = sequencer.store.get_block_at_id(current_height).unwrap();
|
||||
assert_eq!(block.body.transactions, vec![tx.clone()]);
|
||||
}
|
||||
|
||||
@ -791,4 +811,42 @@ mod tests {
|
||||
config.initial_accounts[1].balance + balance_to_move
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_pending_blocks() {
|
||||
let config = setup_sequencer_config();
|
||||
let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config);
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
assert_eq!(sequencer.get_pending_blocks().unwrap().len(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_delete_blocks() {
|
||||
let config = setup_sequencer_config();
|
||||
let (mut sequencer, _mempool_handle) = SequencerCore::start_from_config(config);
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
sequencer
|
||||
.produce_new_block_with_mempool_transactions()
|
||||
.unwrap();
|
||||
|
||||
let last_finalized_block = 3;
|
||||
sequencer
|
||||
.clean_finalized_blocks_from_db(last_finalized_block)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(sequencer.get_pending_blocks().unwrap().len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -405,6 +405,7 @@ mod tests {
|
||||
initial_accounts,
|
||||
initial_commitments: vec![],
|
||||
signing_key: *sequencer_sign_key_for_testing().value(),
|
||||
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
|
||||
bedrock_config: Some(BedrockConfig {
|
||||
channel_id: [42; 32].into(),
|
||||
node_url: "http://localhost:8080".to_string(),
|
||||
|
||||
@ -5,7 +5,8 @@
|
||||
"is_genesis_random": true,
|
||||
"max_num_tx_in_block": 20,
|
||||
"mempool_max_size": 1000,
|
||||
"block_create_timeout_millis": 10000,
|
||||
"block_create_timeout_millis": 5000,
|
||||
"retry_pending_blocks_timeout_millis": 7000,
|
||||
"port": 3040,
|
||||
"initial_accounts": [
|
||||
{
|
||||
|
||||
@ -4,7 +4,7 @@ use actix_web::dev::ServerHandle;
|
||||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use common::rpc_primitives::RpcConfig;
|
||||
use log::info;
|
||||
use log::{info, warn};
|
||||
use sequencer_core::{SequencerCore, config::SequencerConfig};
|
||||
use sequencer_rpc::new_http_server;
|
||||
use tokio::{sync::Mutex, task::JoinHandle};
|
||||
@ -20,8 +20,14 @@ struct Args {
|
||||
|
||||
pub async fn startup_sequencer(
|
||||
app_config: SequencerConfig,
|
||||
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>)> {
|
||||
) -> Result<(
|
||||
ServerHandle,
|
||||
SocketAddr,
|
||||
JoinHandle<Result<()>>,
|
||||
JoinHandle<Result<()>>,
|
||||
)> {
|
||||
let block_timeout = app_config.block_create_timeout_millis;
|
||||
let retry_pending_blocks_timeout = app_config.retry_pending_blocks_timeout_millis;
|
||||
let port = app_config.port;
|
||||
|
||||
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
|
||||
@ -39,8 +45,41 @@ pub async fn startup_sequencer(
|
||||
let http_server_handle = http_server.handle();
|
||||
tokio::spawn(http_server);
|
||||
|
||||
info!("Starting main sequencer loop");
|
||||
info!("Starting pending block retry loop");
|
||||
let seq_core_wrapped_for_block_retry = seq_core_wrapped.clone();
|
||||
let retry_pending_blocks_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
retry_pending_blocks_timeout,
|
||||
))
|
||||
.await;
|
||||
|
||||
let (pending_blocks, block_settlement_client) = {
|
||||
let sequencer_core = seq_core_wrapped_for_block_retry.lock().await;
|
||||
let client = sequencer_core.block_settlement_client();
|
||||
let pending_blocks = sequencer_core
|
||||
.get_pending_blocks()
|
||||
.expect("Sequencer should be able to retrieve pending blocks");
|
||||
(pending_blocks, client)
|
||||
};
|
||||
|
||||
let Some(client) = block_settlement_client else {
|
||||
continue;
|
||||
};
|
||||
|
||||
info!("Resubmitting {} pending blocks", pending_blocks.len());
|
||||
for block in &pending_blocks {
|
||||
if let Err(e) = client.submit_block_to_bedrock(block).await {
|
||||
warn!(
|
||||
"Failed to resubmit block with id {} with error {}",
|
||||
block.header.block_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
info!("Starting main sequencer loop");
|
||||
let main_loop_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await;
|
||||
@ -61,7 +100,12 @@ pub async fn startup_sequencer(
|
||||
}
|
||||
});
|
||||
|
||||
Ok((http_server_handle, addr, main_loop_handle))
|
||||
Ok((
|
||||
http_server_handle,
|
||||
addr,
|
||||
main_loop_handle,
|
||||
retry_pending_blocks_handle,
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn main_runner() -> Result<()> {
|
||||
@ -81,9 +125,26 @@ pub async fn main_runner() -> Result<()> {
|
||||
}
|
||||
|
||||
// ToDo: Add restart on failures
|
||||
let (_, _, main_loop_handle) = startup_sequencer(app_config).await?;
|
||||
let (_, _, main_loop_handle, retry_loop_handle) = startup_sequencer(app_config).await?;
|
||||
|
||||
main_loop_handle.await??;
|
||||
info!("Sequencer running. Monitoring concurrent tasks...");
|
||||
|
||||
tokio::select! {
|
||||
res = main_loop_handle => {
|
||||
match res {
|
||||
Ok(inner_res) => warn!("Main loop exited unexpectedly: {:?}", inner_res),
|
||||
Err(e) => warn!("Main loop task panicked: {:?}", e),
|
||||
}
|
||||
}
|
||||
res = retry_loop_handle => {
|
||||
match res {
|
||||
Ok(inner_res) => warn!("Retry loop exited unexpectedly: {:?}", inner_res),
|
||||
Err(e) => warn!("Retry loop task panicked: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Shutting down sequencer...");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -9,3 +9,4 @@ common.workspace = true
|
||||
thiserror.workspace = true
|
||||
borsh.workspace = true
|
||||
rocksdb.workspace = true
|
||||
nssa.workspace = true
|
||||
|
||||
@ -2,8 +2,9 @@ use std::{path::Path, sync::Arc};
|
||||
|
||||
use common::block::Block;
|
||||
use error::DbError;
|
||||
use nssa::V02State;
|
||||
use rocksdb::{
|
||||
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options,
|
||||
BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, WriteBatch,
|
||||
};
|
||||
|
||||
pub mod error;
|
||||
@ -29,15 +30,15 @@ pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
|
||||
/// Key base for storing metainformation about the last finalized block on Bedrock
|
||||
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
|
||||
|
||||
/// Key base for storing snapshot which describe block id
|
||||
pub const DB_SNAPSHOT_BLOCK_ID_KEY: &str = "block_id";
|
||||
/// Key base for storing the NSSA state
|
||||
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
|
||||
|
||||
/// Name of block column family
|
||||
pub const CF_BLOCK_NAME: &str = "cf_block";
|
||||
/// Name of meta column family
|
||||
pub const CF_META_NAME: &str = "cf_meta";
|
||||
/// Name of snapshot column family
|
||||
pub const CF_SNAPSHOT_NAME: &str = "cf_snapshot";
|
||||
/// Name of state column family
|
||||
pub const CF_NSSA_STATE_NAME: &str = "cf_nssa_state";
|
||||
|
||||
pub type DbResult<T> = Result<T, DbError>;
|
||||
|
||||
@ -52,7 +53,7 @@ impl RocksDBIO {
|
||||
// ToDo: Add more column families for different data
|
||||
let cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
|
||||
let cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
|
||||
let cfsnapshot = ColumnFamilyDescriptor::new(CF_SNAPSHOT_NAME, cf_opts.clone());
|
||||
let cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
|
||||
|
||||
let mut db_opts = Options::default();
|
||||
db_opts.create_missing_column_families(true);
|
||||
@ -60,7 +61,7 @@ impl RocksDBIO {
|
||||
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
path,
|
||||
vec![cfb, cfmeta, cfsnapshot],
|
||||
vec![cfb, cfmeta, cfstate],
|
||||
);
|
||||
|
||||
let dbio = Self {
|
||||
@ -92,7 +93,7 @@ impl RocksDBIO {
|
||||
// ToDo: Add more column families for different data
|
||||
let _cfb = ColumnFamilyDescriptor::new(CF_BLOCK_NAME, cf_opts.clone());
|
||||
let _cfmeta = ColumnFamilyDescriptor::new(CF_META_NAME, cf_opts.clone());
|
||||
let _cfsnapshot = ColumnFamilyDescriptor::new(CF_SNAPSHOT_NAME, cf_opts.clone());
|
||||
let _cfstate = ColumnFamilyDescriptor::new(CF_NSSA_STATE_NAME, cf_opts.clone());
|
||||
|
||||
let mut db_opts = Options::default();
|
||||
db_opts.create_missing_column_families(true);
|
||||
@ -109,8 +110,8 @@ impl RocksDBIO {
|
||||
self.db.cf_handle(CF_BLOCK_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn snapshot_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_SNAPSHOT_NAME).unwrap()
|
||||
pub fn nssa_state_column(&self) -> Arc<BoundColumnFamily<'_>> {
|
||||
self.db.cf_handle(CF_NSSA_STATE_NAME).unwrap()
|
||||
}
|
||||
|
||||
pub fn get_meta_first_block_in_db(&self) -> DbResult<u64> {
|
||||
@ -189,6 +190,24 @@ impl RocksDBIO {
|
||||
Ok(res.is_some())
|
||||
}
|
||||
|
||||
pub fn put_nssa_state_in_db(&self, state: &V02State, batch: &mut WriteBatch) -> DbResult<()> {
|
||||
let cf_nssa_state = self.nssa_state_column();
|
||||
batch.put_cf(
|
||||
&cf_nssa_state,
|
||||
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_NSSA_STATE_KEY".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(state).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize NSSA state".to_string()))
|
||||
})?,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_first_block_in_db(&self, block: Block) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
@ -209,7 +228,15 @@ impl RocksDBIO {
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
self.put_block(block, true)?;
|
||||
let mut batch = WriteBatch::default();
|
||||
self.put_block(block, true, &mut batch)?;
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to write first block in db".to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -274,7 +301,7 @@ impl RocksDBIO {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_block(&self, block: Block, first: bool) -> DbResult<()> {
|
||||
pub fn put_block(&self, block: Block, first: bool, batch: &mut WriteBatch) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
|
||||
if !first {
|
||||
@ -285,23 +312,15 @@ impl RocksDBIO {
|
||||
}
|
||||
}
|
||||
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block.header.block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&block).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize block data".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
batch.put_cf(
|
||||
&cf_block,
|
||||
borsh::to_vec(&block.header.block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
|
||||
})?,
|
||||
borsh::to_vec(&block).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block data".to_string()))
|
||||
})?,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -334,32 +353,90 @@ impl RocksDBIO {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_snapshot_block_id(&self) -> DbResult<u64> {
|
||||
let cf_snapshot = self.snapshot_column();
|
||||
pub fn get_nssa_state(&self) -> DbResult<V02State> {
|
||||
let cf_nssa_state = self.nssa_state_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_snapshot,
|
||||
borsh::to_vec(&DB_SNAPSHOT_BLOCK_ID_KEY).map_err(|err| {
|
||||
&cf_nssa_state,
|
||||
borsh::to_vec(&DB_NSSA_STATE_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize DB_SNAPSHOT_BLOCK_ID_KEY".to_string()),
|
||||
Some("Failed to serialize block id".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
if let Some(data) = res {
|
||||
Ok(borsh::from_slice::<u64>(&data).map_err(|err| {
|
||||
Ok(borsh::from_slice::<V02State>(&data).map_err(|serr| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize last block".to_string()),
|
||||
serr,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})?)
|
||||
} else {
|
||||
Err(DbError::db_interaction_error(
|
||||
"Snapshot block ID not found".to_string(),
|
||||
"Block on this id not found".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_block(&self, block_id: u64) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
let key = borsh::to_vec(&block_id).map_err(|err| {
|
||||
DbError::borsh_cast_message(err, Some("Failed to serialize block id".to_string()))
|
||||
})?;
|
||||
|
||||
if self
|
||||
.db
|
||||
.get_cf(&cf_block, &key)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?
|
||||
.is_none()
|
||||
{
|
||||
return Err(DbError::db_interaction_error(
|
||||
"Block on this id not found".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
self.db
|
||||
.delete_cf(&cf_block, key)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
|
||||
let cf_block = self.block_column();
|
||||
self.db
|
||||
.iterator_cf(&cf_block, rocksdb::IteratorMode::Start)
|
||||
.map(|res| {
|
||||
let (_key, value) = res.map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some("Failed to get key value pair".to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
borsh::from_slice::<Block>(&value).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize block data".to_string()),
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn atomic_update(&self, block: Block, state: &V02State) -> DbResult<()> {
|
||||
let block_id = block.header.block_id;
|
||||
let mut batch = WriteBatch::default();
|
||||
self.put_block(block, false, &mut batch)?;
|
||||
self.put_nssa_state_in_db(state, &mut batch)?;
|
||||
self.db.write(batch).map_err(|rerr| {
|
||||
DbError::rocksdb_cast_message(
|
||||
rerr,
|
||||
Some(format!("Failed to udpate db with block {block_id}")),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user