Merge pull request #333 from logos-blockchain/arjentix/fix-sequencer-msg-id

Fix sequencer data synchronization problems
This commit is contained in:
Sergio Chouhy 2026-02-16 08:32:50 -03:00 committed by GitHub
commit d8a8bdfc97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
53 changed files with 806 additions and 278 deletions

1
.gitignore vendored
View File

@ -10,3 +10,4 @@ sequencer_runner/data/
storage.json
result
wallet-ffi/wallet_ffi.h
bedrock_signing_key

10
Cargo.lock generated
View File

@ -2484,7 +2484,6 @@ dependencies = [
"console_error_panic_hook",
"console_log",
"env_logger",
"hex",
"indexer_service_protocol",
"indexer_service_rpc",
"jsonrpsee",
@ -3427,12 +3426,16 @@ dependencies = [
name = "indexer_service_protocol"
version = "0.1.0"
dependencies = [
"anyhow",
"base58",
"base64 0.22.1",
"common",
"hex",
"nssa",
"nssa_core",
"schemars 1.2.0",
"serde",
"serde_with",
]
[[package]]
@ -3856,9 +3859,9 @@ dependencies = [
[[package]]
name = "keccak"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654"
checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653"
dependencies = [
"cpufeatures",
]
@ -8257,7 +8260,6 @@ dependencies = [
"amm_core",
"anyhow",
"async-stream",
"base58",
"base64 0.22.1",
"borsh",
"bytemuck",

View File

@ -135,10 +135,10 @@ The sequencer and node can be run locally:
1. On one terminal go to the `logos-blockchain/logos-blockchain` repo and run a local logos blockchain node:
- `git checkout master; git pull`
- `cargo clean`
- `rm ~/.logos-blockchain-circuits`
- `rm -r ~/.logos-blockchain-circuits`
- `./scripts/setup-logos-blockchain-circuits.sh`
- `cargo build --all-features`
- `./target/debug/logos-blockchain-node nodes/node/config-one-node.yaml`
- `./target/debug/logos-blockchain-node --deployment nodes/node/standalone-deployment-config.yaml nodes/node/standalone-node-config.yaml`
2. On another terminal go to the `logos-blockchain/lssa` repo and run indexer service:
- `RUST_LOG=info cargo run --release -p indexer_service indexer/service/configs/indexer_config.json`

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -62,7 +62,6 @@ impl BedrockClient {
Retry::spawn(self.backoff_strategy(), || {
self.http_client
.post_transaction(self.node_url.clone(), tx.clone())
.inspect_err(|err| warn!("Transaction posting failed with error: {err:#}"))
})
.await
}

View File

@ -4,6 +4,16 @@ use sha2::{Digest, Sha256, digest::FixedOutput};
use crate::{HashType, transaction::NSSATransaction};
pub type MantleMsgId = [u8; 32];
pub type BlockHash = HashType;
pub type BlockId = u64;
pub type TimeStamp = u64;
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub struct BlockMeta {
pub id: BlockId,
pub hash: BlockHash,
pub msg_id: MantleMsgId,
}
#[derive(Debug, Clone)]
/// Our own hasher.
@ -19,10 +29,6 @@ impl OwnHasher {
}
}
pub type BlockHash = HashType;
pub type BlockId = u64;
pub type TimeStamp = u64;
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub struct BlockHeader {
pub block_id: BlockId,

View File

@ -13,25 +13,13 @@ pub struct SequencerRpcError {
#[derive(thiserror::Error, Debug)]
pub enum SequencerClientError {
#[error("HTTP error")]
HTTPError(reqwest::Error),
HTTPError(#[from] reqwest::Error),
#[error("Serde error")]
SerdeError(serde_json::Error),
#[error("Internal error")]
SerdeError(#[from] serde_json::Error),
#[error("Internal error: {0:?}")]
InternalError(SequencerRpcError),
}
impl From<reqwest::Error> for SequencerClientError {
fn from(value: reqwest::Error) -> Self {
SequencerClientError::HTTPError(value)
}
}
impl From<serde_json::Error> for SequencerClientError {
fn from(value: serde_json::Error) -> Self {
SequencerClientError::SerdeError(value)
}
}
impl From<SequencerRpcError> for SequencerClientError {
fn from(value: SequencerRpcError) -> Self {
SequencerClientError::InternalError(value)

View File

@ -26,9 +26,6 @@ console_log = "1.0"
# Date/Time
chrono.workspace = true
# Hex encoding/decoding
hex.workspace = true
# URL encoding
urlencoding = "2.1"

View File

@ -1,3 +1,5 @@
use std::str::FromStr as _;
use indexer_service_protocol::{Account, AccountId, Block, BlockId, HashType, Transaction};
use leptos::prelude::*;
use serde::{Deserialize, Serialize};
@ -25,13 +27,6 @@ pub async fn get_account(account_id: AccountId) -> Result<Account, ServerFnError
.map_err(|e| ServerFnError::ServerError(format!("RPC error: {}", e)))
}
/// Parse hex string to bytes
#[cfg(feature = "ssr")]
fn parse_hex(s: &str) -> Option<Vec<u8>> {
let s = s.trim().trim_start_matches("0x");
hex::decode(s).ok()
}
/// Search for a block, transaction, or account by query string
#[server]
pub async fn search(query: String) -> Result<SearchResults, ServerFnError> {
@ -42,12 +37,8 @@ pub async fn search(query: String) -> Result<SearchResults, ServerFnError> {
let mut transactions = Vec::new();
let mut accounts = Vec::new();
// Try to parse as hash (32 bytes)
if let Some(bytes) = parse_hex(&query)
&& let Ok(hash_array) = <[u8; 32]>::try_from(bytes)
{
let hash = HashType(hash_array);
// Try as hash
if let Ok(hash) = HashType::from_str(&query) {
// Try as block hash
if let Ok(block) = client.get_block_by_hash(hash).await {
blocks.push(block);
@ -57,12 +48,13 @@ pub async fn search(query: String) -> Result<SearchResults, ServerFnError> {
if let Ok(tx) = client.get_transaction(hash).await {
transactions.push(tx);
}
}
// Try as account ID
let account_id = AccountId { value: hash_array };
if let Ok(account) = client.get_account(account_id).await {
accounts.push((account_id, account));
}
// Try as account ID
if let Ok(account_id) = AccountId::from_str(&query)
&& let Ok(account) = client.get_account(account_id).await
{
accounts.push((account_id, account));
}
// Try as block ID

View File

@ -2,12 +2,10 @@ use indexer_service_protocol::{Account, AccountId};
use leptos::prelude::*;
use leptos_router::components::A;
use crate::format_utils;
/// Account preview component
#[component]
pub fn AccountPreview(account_id: AccountId, account: Account) -> impl IntoView {
let account_id_str = format_utils::format_account_id(&account_id);
let account_id_str = account_id.to_string();
view! {
<div class="account-preview">
@ -20,7 +18,7 @@ pub fn AccountPreview(account_id: AccountId, account: Account) -> impl IntoView
</div>
{move || {
let Account { program_owner, balance, data, nonce } = &account;
let program_id = format_utils::format_program_id(program_owner);
let program_id = program_owner.to_string();
view! {
<div class="account-preview-body">
<div class="account-field">

View File

@ -32,8 +32,8 @@ pub fn BlockPreview(block: Block) -> impl IntoView {
let tx_count = transactions.len();
let hash_str = hex::encode(hash.0);
let prev_hash_str = hex::encode(prev_block_hash.0);
let hash_str = hash.to_string();
let prev_hash_str = prev_block_hash.to_string();
let time_str = format_utils::format_timestamp(timestamp);
let status_str = match &bedrock_status {
BedrockStatus::Pending => "Pending",

View File

@ -15,7 +15,7 @@ fn transaction_type_info(tx: &Transaction) -> (&'static str, &'static str) {
#[component]
pub fn TransactionPreview(transaction: Transaction) -> impl IntoView {
let hash = transaction.hash();
let hash_str = hex::encode(hash.0);
let hash_str = hash.to_string();
let (type_name, type_class) = transaction_type_info(&transaction);
// Get additional metadata based on transaction type

View File

@ -1,7 +1,5 @@
//! Formatting utilities for the explorer
use indexer_service_protocol::{AccountId, ProgramId};
/// Format timestamp to human-readable string
pub fn format_timestamp(timestamp: u64) -> String {
let seconds = timestamp / 1000;
@ -9,25 +7,3 @@ pub fn format_timestamp(timestamp: u64) -> String {
.unwrap_or_else(|| chrono::DateTime::from_timestamp(0, 0).unwrap());
datetime.format("%Y-%m-%d %H:%M:%S UTC").to_string()
}
/// Format hash (32 bytes) to hex string
pub fn format_hash(hash: &[u8; 32]) -> String {
hex::encode(hash)
}
/// Format account ID to hex string
pub fn format_account_id(account_id: &AccountId) -> String {
hex::encode(account_id.value)
}
/// Format program ID to hex string
pub fn format_program_id(program_id: &ProgramId) -> String {
let bytes: Vec<u8> = program_id.iter().flat_map(|n| n.to_be_bytes()).collect();
hex::encode(bytes)
}
/// Parse hex string to bytes
pub fn parse_hex(s: &str) -> Option<Vec<u8>> {
let s = s.trim().trim_start_matches("0x");
hex::decode(s).ok()
}

View File

@ -1,8 +1,10 @@
use std::str::FromStr as _;
use indexer_service_protocol::{Account, AccountId};
use leptos::prelude::*;
use leptos_router::hooks::use_params_map;
use crate::{api, components::TransactionPreview, format_utils};
use crate::{api, components::TransactionPreview};
/// Account page component
#[component]
@ -17,16 +19,7 @@ pub fn AccountPage() -> impl IntoView {
// Parse account ID from URL params
let account_id = move || {
let account_id_str = params.read().get("id").unwrap_or_default();
format_utils::parse_hex(&account_id_str).and_then(|bytes| {
if bytes.len() == 32 {
let account_id_array: [u8; 32] = bytes.try_into().ok()?;
Some(AccountId {
value: account_id_array,
})
} else {
None
}
})
AccountId::from_str(&account_id_str).ok()
};
// Load account data
@ -98,8 +91,8 @@ pub fn AccountPage() -> impl IntoView {
} = acc;
let acc_id = account_id().expect("Account ID should be set");
let account_id_str = format_utils::format_account_id(&acc_id);
let program_id = format_utils::format_program_id(&program_owner);
let account_id_str = acc_id.to_string();
let program_id = program_owner.to_string();
let balance_str = balance.to_string();
let nonce_str = nonce.to_string();
let data_len = data.0.len();

View File

@ -1,3 +1,5 @@
use std::str::FromStr as _;
use indexer_service_protocol::{BedrockStatus, Block, BlockBody, BlockHeader, BlockId, HashType};
use leptos::prelude::*;
use leptos_router::{components::A, hooks::use_params_map};
@ -25,11 +27,8 @@ pub fn BlockPage() -> impl IntoView {
}
// Try to parse as block hash (hex string)
let id_str = id_str.trim().trim_start_matches("0x");
if let Some(bytes) = format_utils::parse_hex(id_str)
&& let Ok(hash_array) = <[u8; 32]>::try_from(bytes)
{
return Some(BlockIdOrHash::Hash(HashType(hash_array)));
if let Ok(hash) = HashType::from_str(&id_str) {
return Some(BlockIdOrHash::Hash(hash));
}
None
@ -68,10 +67,10 @@ pub fn BlockPage() -> impl IntoView {
bedrock_parent_id: _,
} = blk;
let hash_str = format_utils::format_hash(&hash.0);
let prev_hash = format_utils::format_hash(&prev_block_hash.0);
let hash_str = hash.to_string();
let prev_hash = prev_block_hash.to_string();
let timestamp_str = format_utils::format_timestamp(timestamp);
let signature_str = hex::encode(signature.0);
let signature_str = signature.to_string();
let status = match &bedrock_status {
BedrockStatus::Pending => "Pending",
BedrockStatus::Safe => "Safe",

View File

@ -1,3 +1,5 @@
use std::str::FromStr as _;
use indexer_service_protocol::{
HashType, PrivacyPreservingMessage, PrivacyPreservingTransaction, ProgramDeploymentMessage,
ProgramDeploymentTransaction, PublicMessage, PublicTransaction, Transaction, WitnessSet,
@ -5,7 +7,7 @@ use indexer_service_protocol::{
use leptos::prelude::*;
use leptos_router::{components::A, hooks::use_params_map};
use crate::{api, format_utils};
use crate::api;
/// Transaction page component
#[component]
@ -14,15 +16,10 @@ pub fn TransactionPage() -> impl IntoView {
let transaction_resource = Resource::new(
move || {
let tx_hash_str = params.read().get("hash").unwrap_or_default();
format_utils::parse_hex(&tx_hash_str).and_then(|bytes| {
if bytes.len() == 32 {
let hash_array: [u8; 32] = bytes.try_into().ok()?;
Some(HashType(hash_array))
} else {
None
}
})
params
.read()
.get("hash")
.and_then(|s| HashType::from_str(&s).ok())
},
|hash_opt| async move {
match hash_opt {
@ -42,7 +39,7 @@ pub fn TransactionPage() -> impl IntoView {
.get()
.map(|result| match result {
Ok(tx) => {
let tx_hash = format_utils::format_hash(&tx.hash().0);
let tx_hash = tx.hash().to_string();
let tx_type = match &tx {
Transaction::Public(_) => "Public Transaction",
Transaction::PrivacyPreserving(_) => "Privacy-Preserving Transaction",
@ -86,10 +83,7 @@ pub fn TransactionPage() -> impl IntoView {
proof,
} = witness_set;
let program_id_str = program_id
.iter()
.map(|n| format!("{:08x}", n))
.collect::<String>();
let program_id_str = program_id.to_string();
let proof_len = proof.0.len();
let signatures_count = signatures_and_public_keys.len();
@ -123,7 +117,7 @@ pub fn TransactionPage() -> impl IntoView {
.into_iter()
.zip(nonces.into_iter())
.map(|(account_id, nonce)| {
let account_id_str = format_utils::format_account_id(&account_id);
let account_id_str = account_id.to_string();
view! {
<div class="account-item">
<A href=format!("/account/{}", account_id_str)>
@ -197,7 +191,7 @@ pub fn TransactionPage() -> impl IntoView {
.into_iter()
.zip(nonces.into_iter())
.map(|(account_id, nonce)| {
let account_id_str = format_utils::format_account_id(&account_id);
let account_id_str = account_id.to_string();
view! {
<div class="account-item">
<A href=format!("/account/{}", account_id_str)>

View File

@ -1,7 +1,7 @@
{
"resubscribe_interval_millis": 1000,
"bedrock_client_config": {
"addr": "http://localhost:18080",
"addr": "http://localhost:8080",
"backoff": {
"start_delay_millis": 100,
"max_retries": 5

View File

@ -10,8 +10,12 @@ nssa = { workspace = true, optional = true }
common = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
serde_with.workspace = true
schemars.workspace = true
base64.workspace = true
base58.workspace = true
hex.workspace = true
anyhow.workspace = true
[features]
# Enable conversion to/from NSSA core types

View File

@ -6,6 +6,18 @@ use crate::*;
// Account-related conversions
// ============================================================================
impl From<[u32; 8]> for ProgramId {
fn from(value: [u32; 8]) -> Self {
Self(value)
}
}
impl From<ProgramId> for [u32; 8] {
fn from(value: ProgramId) -> Self {
value.0
}
}
impl From<nssa_core::account::AccountId> for AccountId {
fn from(value: nssa_core::account::AccountId) -> Self {
Self {
@ -31,7 +43,7 @@ impl From<nssa_core::account::Account> for Account {
} = value;
Self {
program_owner,
program_owner: program_owner.into(),
balance,
data: data.into(),
nonce,
@ -51,7 +63,7 @@ impl TryFrom<Account> for nssa_core::account::Account {
} = value;
Ok(nssa_core::account::Account {
program_owner,
program_owner: program_owner.into(),
balance,
data: data.try_into()?,
nonce,
@ -230,7 +242,7 @@ impl From<nssa::public_transaction::Message> for PublicMessage {
instruction_data,
} = value;
Self {
program_id,
program_id: program_id.into(),
account_ids: account_ids.into_iter().map(Into::into).collect(),
nonces,
instruction_data,
@ -247,7 +259,7 @@ impl From<PublicMessage> for nssa::public_transaction::Message {
instruction_data,
} = value;
Self::new_preserialized(
program_id,
program_id.into(),
account_ids.into_iter().map(Into::into).collect(),
nonces,
instruction_data,

View File

@ -3,23 +3,81 @@
//! Currently it mostly mimics types from `nssa_core`, but it's important to have a separate crate
//! to define a stable interface for the indexer service RPCs which evolves in its own way.
use std::{fmt::Display, str::FromStr};
use anyhow::anyhow;
use base58::{FromBase58 as _, ToBase58 as _};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay};
#[cfg(feature = "convert")]
mod convert;
pub type Nonce = u128;
pub type ProgramId = [u32; 8];
#[derive(
Debug, Copy, Clone, PartialEq, Eq, Hash, SerializeDisplay, DeserializeFromStr, JsonSchema,
)]
pub struct ProgramId(pub [u32; 8]);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
impl Display for ProgramId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let bytes: Vec<u8> = self.0.iter().flat_map(|n| n.to_be_bytes()).collect();
write!(f, "{}", bytes.to_base58())
}
}
impl FromStr for ProgramId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = s
.from_base58()
.map_err(|_| hex::FromHexError::InvalidStringLength)?;
if bytes.len() != 32 {
return Err(hex::FromHexError::InvalidStringLength);
}
let mut arr = [0u32; 8];
for (i, chunk) in bytes.chunks_exact(4).enumerate() {
arr[i] = u32::from_be_bytes(chunk.try_into().unwrap());
}
Ok(ProgramId(arr))
}
}
#[derive(
Debug, Copy, Clone, PartialEq, Eq, Hash, SerializeDisplay, DeserializeFromStr, JsonSchema,
)]
pub struct AccountId {
#[serde(with = "base64::arr")]
#[schemars(with = "String", description = "base64-encoded account ID")]
pub value: [u8; 32],
}
impl Display for AccountId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.value.to_base58())
}
}
impl FromStr for AccountId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = s
.from_base58()
.map_err(|err| anyhow!("invalid base58: {err:?}"))?;
if bytes.len() != 32 {
return Err(anyhow!(
"invalid length: expected 32 bytes, got {}",
bytes.len()
));
}
let mut value = [0u8; 32];
value.copy_from_slice(&bytes);
Ok(AccountId { value })
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
pub struct Account {
pub program_owner: ProgramId,
@ -48,13 +106,27 @@ pub struct BlockHeader {
pub signature: Signature,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Hash, SerializeDisplay, DeserializeFromStr, JsonSchema)]
pub struct Signature(
#[serde(with = "base64::arr")]
#[schemars(with = "String", description = "base64-encoded signature")]
pub [u8; 64],
#[schemars(with = "String", description = "hex-encoded signature")] pub [u8; 64],
);
impl Display for Signature {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
impl FromStr for Signature {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut bytes = [0u8; 64];
hex::decode_to_slice(s, &mut bytes)?;
Ok(Signature(bytes))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
pub struct BlockBody {
pub transactions: Vec<Transaction>,
@ -196,12 +268,26 @@ pub struct Data(
pub Vec<u8>,
);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
pub struct HashType(
#[serde(with = "base64::arr")]
#[schemars(with = "String", description = "base64-encoded hash")]
pub [u8; 32],
);
#[derive(
Debug, Copy, Clone, PartialEq, Eq, Hash, SerializeDisplay, DeserializeFromStr, JsonSchema,
)]
pub struct HashType(pub [u8; 32]);
impl Display for HashType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
impl FromStr for HashType {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut bytes = [0u8; 32];
hex::decode_to_slice(s, &mut bytes)?;
Ok(HashType(bytes))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
pub struct MantleMsgId(

View File

@ -10,7 +10,7 @@ compile_error!("At least one of `server` or `client` features must be enabled.")
#[cfg_attr(all(feature = "client", not(feature = "server")), rpc(client))]
#[cfg_attr(all(feature = "server", feature = "client"), rpc(server, client))]
pub trait Rpc {
#[method(name = "get_schema")]
#[method(name = "getSchema")]
fn get_schema(&self) -> Result<serde_json::Value, ErrorObjectOwned> {
// TODO: Canonical solution would be to provide `describe` method returning OpenRPC spec,
// But for now it's painful to implement, although can be done if really needed.

View File

@ -4,8 +4,8 @@ use indexer_service_protocol::{
Account, AccountId, BedrockStatus, Block, BlockBody, BlockHeader, BlockId, Commitment,
CommitmentSetDigest, Data, EncryptedAccountData, HashType, MantleMsgId,
PrivacyPreservingMessage, PrivacyPreservingTransaction, ProgramDeploymentMessage,
ProgramDeploymentTransaction, PublicMessage, PublicTransaction, Signature, Transaction,
WitnessSet,
ProgramDeploymentTransaction, ProgramId, PublicMessage, PublicTransaction, Signature,
Transaction, WitnessSet,
};
use jsonrpsee::{core::SubscriptionResult, types::ErrorObjectOwned};
@ -35,7 +35,7 @@ impl MockIndexerService {
accounts.insert(
*account_id,
Account {
program_owner: [i as u32; 8],
program_owner: ProgramId([i as u32; 8]),
balance: 1000 * (i as u128 + 1),
data: Data(vec![0xaa, 0xbb, 0xcc]),
nonce: i as u128,
@ -73,7 +73,7 @@ impl MockIndexerService {
0 | 1 => Transaction::Public(PublicTransaction {
hash: tx_hash,
message: PublicMessage {
program_id: [1u32; 8],
program_id: ProgramId([1u32; 8]),
account_ids: vec![
account_ids[tx_idx as usize % account_ids.len()],
account_ids[(tx_idx as usize + 1) % account_ids.len()],
@ -95,7 +95,7 @@ impl MockIndexerService {
],
nonces: vec![block_id as u128],
public_post_states: vec![Account {
program_owner: [1u32; 8],
program_owner: ProgramId([1u32; 8]),
balance: 500,
data: Data(vec![0xdd, 0xee]),
nonce: block_id as u128,

View File

@ -321,15 +321,22 @@ impl Drop for TestContext {
/// A test context to be used in normal #[test] tests
pub struct BlockingTestContext {
pub ctx: TestContext,
pub runtime: tokio::runtime::Runtime,
ctx: Option<TestContext>,
runtime: tokio::runtime::Runtime,
}
impl BlockingTestContext {
pub fn new() -> Result<Self> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let ctx = runtime.block_on(TestContext::new())?;
Ok(Self { ctx, runtime })
Ok(Self {
ctx: Some(ctx),
runtime,
})
}
pub fn ctx(&self) -> &TestContext {
self.ctx.as_ref().expect("TestContext is set")
}
}
@ -370,6 +377,19 @@ impl TestContextBuilder {
}
}
impl Drop for BlockingTestContext {
fn drop(&mut self) {
let Self { ctx, runtime } = self;
// Ensure async cleanup of TestContext by blocking on its drop in the runtime.
runtime.block_on(async {
if let Some(ctx) = ctx.take() {
drop(ctx);
}
})
}
}
pub fn format_public_account_id(account_id: AccountId) -> String {
format!("Public/{account_id}")
}

View File

@ -102,8 +102,8 @@ fn new_wallet_ffi_with_test_context_config(ctx: &BlockingTestContext) -> *mut Wa
let tempdir = tempfile::tempdir().unwrap();
let config_path = tempdir.path().join("wallet_config.json");
let storage_path = tempdir.path().join("storage.json");
let mut config = ctx.ctx.wallet().config().to_owned();
if let Some(config_overrides) = ctx.ctx.wallet().config_overrides().clone() {
let mut config = ctx.ctx().wallet().config().to_owned();
if let Some(config_overrides) = ctx.ctx().wallet().config_overrides().clone() {
config.apply_overrides(config_overrides);
}
let mut file = std::fs::OpenOptions::new()
@ -119,7 +119,7 @@ fn new_wallet_ffi_with_test_context_config(ctx: &BlockingTestContext) -> *mut Wa
let config_path = CString::new(config_path.to_str().unwrap()).unwrap();
let storage_path = CString::new(storage_path.to_str().unwrap()).unwrap();
let password = CString::new(ctx.ctx.wallet_password()).unwrap();
let password = CString::new(ctx.ctx().wallet_password()).unwrap();
unsafe {
wallet_ffi_create_new(
@ -325,7 +325,7 @@ fn test_wallet_ffi_list_accounts() {
#[test]
fn test_wallet_ffi_get_balance_public() -> Result<()> {
let ctx = BlockingTestContext::new()?;
let account_id: AccountId = ctx.ctx.existing_public_accounts()[0];
let account_id: AccountId = ctx.ctx().existing_public_accounts()[0];
let wallet_ffi_handle = new_wallet_ffi_with_test_context_config(&ctx);
let balance = unsafe {
@ -353,7 +353,7 @@ fn test_wallet_ffi_get_balance_public() -> Result<()> {
#[test]
fn test_wallet_ffi_get_account_public() -> Result<()> {
let ctx = BlockingTestContext::new()?;
let account_id: AccountId = ctx.ctx.existing_public_accounts()[0];
let account_id: AccountId = ctx.ctx().existing_public_accounts()[0];
let wallet_ffi_handle = new_wallet_ffi_with_test_context_config(&ctx);
let mut out_account = FfiAccount::default();
@ -388,7 +388,7 @@ fn test_wallet_ffi_get_account_public() -> Result<()> {
#[test]
fn test_wallet_ffi_get_public_account_keys() -> Result<()> {
let ctx = BlockingTestContext::new()?;
let account_id: AccountId = ctx.ctx.existing_public_accounts()[0];
let account_id: AccountId = ctx.ctx().existing_public_accounts()[0];
let wallet_ffi_handle = new_wallet_ffi_with_test_context_config(&ctx);
let mut out_key = FfiPublicAccountKey::default();
@ -404,7 +404,7 @@ fn test_wallet_ffi_get_public_account_keys() -> Result<()> {
let expected_key = {
let private_key = ctx
.ctx
.ctx()
.wallet()
.get_account_public_signing_key(account_id)
.unwrap();
@ -425,7 +425,7 @@ fn test_wallet_ffi_get_public_account_keys() -> Result<()> {
#[test]
fn test_wallet_ffi_get_private_account_keys() -> Result<()> {
let ctx = BlockingTestContext::new()?;
let account_id: AccountId = ctx.ctx.existing_public_accounts()[0];
let account_id: AccountId = ctx.ctx().existing_private_accounts()[0];
let wallet_ffi_handle = new_wallet_ffi_with_test_context_config(&ctx);
let mut keys = FfiPrivateAccountKeys::default();
@ -439,7 +439,7 @@ fn test_wallet_ffi_get_private_account_keys() -> Result<()> {
};
let key_chain = &ctx
.ctx
.ctx()
.wallet()
.storage()
.user_data
@ -567,8 +567,8 @@ fn test_wallet_ffi_init_public_account_auth_transfer() -> Result<()> {
fn test_wallet_ffi_transfer_public() -> Result<()> {
let ctx = BlockingTestContext::new().unwrap();
let wallet_ffi_handle = new_wallet_ffi_with_test_context_config(&ctx);
let from: FfiBytes32 = (&ctx.ctx.existing_public_accounts()[0]).into();
let to: FfiBytes32 = (&ctx.ctx.existing_public_accounts()[1]).into();
let from: FfiBytes32 = (&ctx.ctx().existing_public_accounts()[0]).into();
let to: FfiBytes32 = (&ctx.ctx().existing_public_accounts()[1]).into();
let amount: [u8; 16] = 100u128.to_le_bytes();
let mut transfer_result = FfiTransferResult::default();

View File

@ -23,7 +23,7 @@ pub trait BlockSettlementClientTrait: Clone {
fn bedrock_signing_key(&self) -> &Ed25519Key;
/// Post a transaction to the node.
async fn submit_block_to_bedrock(&self, block: &Block) -> Result<MsgId>;
async fn submit_inscribe_tx_to_bedrock(&self, tx: SignedMantleTx) -> Result<()>;
/// Create and sign a transaction for inscribing data.
fn create_inscribe_tx(&self, block: &Block) -> Result<(SignedMantleTx, MsgId)> {
@ -89,16 +89,13 @@ impl BlockSettlementClientTrait for BlockSettlementClient {
})
}
async fn submit_block_to_bedrock(&self, block: &Block) -> Result<MsgId> {
let (tx, new_msg_id) = self.create_inscribe_tx(block)?;
// Post the transaction
async fn submit_inscribe_tx_to_bedrock(&self, tx: SignedMantleTx) -> Result<()> {
self.bedrock_client
.post_transaction(tx)
.await
.context("Failed to post transaction to Bedrock")?;
Ok(new_msg_id)
Ok(())
}
fn bedrock_channel_id(&self) -> ChannelId {

View File

@ -1,7 +1,11 @@
use std::{collections::HashMap, path::Path};
use anyhow::Result;
use common::{HashType, block::Block, transaction::NSSATransaction};
use common::{
HashType,
block::{Block, BlockMeta, MantleMsgId},
transaction::NSSATransaction,
};
use nssa::V02State;
use storage::RocksDBIO;
@ -20,10 +24,10 @@ impl SequencerStore {
/// ATTENTION: Will overwrite genesis block.
pub fn open_db_with_genesis(
location: &Path,
genesis_block: Option<&Block>,
genesis_block: Option<(&Block, MantleMsgId)>,
signing_key: nssa::PrivateKey,
) -> Result<Self> {
let tx_hash_to_block_map = if let Some(block) = &genesis_block {
let tx_hash_to_block_map = if let Some((block, _msg_id)) = &genesis_block {
block_to_transactions_map(block)
} else {
HashMap::new()
@ -54,6 +58,10 @@ impl SequencerStore {
Ok(self.dbio.delete_block(block_id)?)
}
pub fn mark_block_as_finalized(&mut self, block_id: u64) -> Result<()> {
Ok(self.dbio.mark_block_as_finalized(block_id)?)
}
/// Returns the transaction corresponding to the given hash, if it exists in the blockchain.
pub fn get_transaction_by_hash(&self, hash: HashType) -> Option<NSSATransaction> {
let block_id = self.tx_hash_to_block_map.get(&hash);
@ -68,8 +76,8 @@ impl SequencerStore {
None
}
pub fn insert(&mut self, tx: &NSSATransaction, block_id: u64) {
self.tx_hash_to_block_map.insert(tx.hash(), block_id);
pub fn latest_block_meta(&self) -> Result<BlockMeta> {
Ok(self.dbio.latest_block_meta()?)
}
pub fn genesis_id(&self) -> u64 {
@ -84,9 +92,14 @@ impl SequencerStore {
self.dbio.get_all_blocks().map(|res| Ok(res?))
}
pub(crate) fn update(&mut self, block: &Block, state: &V02State) -> Result<()> {
pub(crate) fn update(
&mut self,
block: &Block,
msg_id: MantleMsgId,
state: &V02State,
) -> Result<()> {
let new_transactions_map = block_to_transactions_map(block);
self.dbio.atomic_update(block, state)?;
self.dbio.atomic_update(block, msg_id, state)?;
self.tx_hash_to_block_map.extend(new_transactions_map);
Ok(())
}
@ -128,8 +141,12 @@ mod tests {
let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
// Start an empty node store
let mut node_store =
SequencerStore::open_db_with_genesis(path, Some(&genesis_block), signing_key).unwrap();
let mut node_store = SequencerStore::open_db_with_genesis(
path,
Some((&genesis_block, [0; 32])),
signing_key,
)
.unwrap();
let tx = common::test_utils::produce_dummy_empty_transaction();
let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
@ -139,9 +156,126 @@ mod tests {
assert_eq!(None, retrieved_tx);
// Add the block with the transaction
let dummy_state = V02State::new_with_genesis_accounts(&[], &[]);
node_store.update(&block, &dummy_state).unwrap();
node_store.update(&block, [1; 32], &dummy_state).unwrap();
// Try again
let retrieved_tx = node_store.get_transaction_by_hash(tx.hash());
assert_eq!(Some(tx), retrieved_tx);
}
#[test]
fn test_latest_block_meta_returns_genesis_meta_initially() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path();
let signing_key = sequencer_sign_key_for_testing();
let genesis_block_hashable_data = HashableBlockData {
block_id: 0,
prev_block_hash: HashType([0; 32]),
timestamp: 0,
transactions: vec![],
};
let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
let genesis_hash = genesis_block.header.hash;
let node_store = SequencerStore::open_db_with_genesis(
path,
Some((&genesis_block, [0; 32])),
signing_key,
)
.unwrap();
// Verify that initially the latest block hash equals genesis hash
let latest_meta = node_store.latest_block_meta().unwrap();
assert_eq!(latest_meta.hash, genesis_hash);
assert_eq!(latest_meta.msg_id, [0; 32]);
}
#[test]
fn test_latest_block_meta_updates_after_new_block() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path();
let signing_key = sequencer_sign_key_for_testing();
let genesis_block_hashable_data = HashableBlockData {
block_id: 0,
prev_block_hash: HashType([0; 32]),
timestamp: 0,
transactions: vec![],
};
let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
let mut node_store = SequencerStore::open_db_with_genesis(
path,
Some((&genesis_block, [0; 32])),
signing_key,
)
.unwrap();
// Add a new block
let tx = common::test_utils::produce_dummy_empty_transaction();
let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
let block_hash = block.header.hash;
let block_msg_id = [1; 32];
let dummy_state = V02State::new_with_genesis_accounts(&[], &[]);
node_store
.update(&block, block_msg_id, &dummy_state)
.unwrap();
// Verify that the latest block meta now equals the new block's hash and msg_id
let latest_meta = node_store.latest_block_meta().unwrap();
assert_eq!(latest_meta.hash, block_hash);
assert_eq!(latest_meta.msg_id, block_msg_id);
}
#[test]
fn test_mark_block_finalized() {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path();
let signing_key = sequencer_sign_key_for_testing();
let genesis_block_hashable_data = HashableBlockData {
block_id: 0,
prev_block_hash: HashType([0; 32]),
timestamp: 0,
transactions: vec![],
};
let genesis_block = genesis_block_hashable_data.into_pending_block(&signing_key, [0; 32]);
let mut node_store = SequencerStore::open_db_with_genesis(
path,
Some((&genesis_block, [0; 32])),
signing_key,
)
.unwrap();
// Add a new block with Pending status
let tx = common::test_utils::produce_dummy_empty_transaction();
let block = common::test_utils::produce_dummy_block(1, None, vec![tx.clone()]);
let block_id = block.header.block_id;
let dummy_state = V02State::new_with_genesis_accounts(&[], &[]);
node_store.update(&block, [1; 32], &dummy_state).unwrap();
// Verify initial status is Pending
let retrieved_block = node_store.get_block_at_id(block_id).unwrap();
assert!(matches!(
retrieved_block.bedrock_status,
common::block::BedrockStatus::Pending
));
// Mark block as finalized
node_store.mark_block_as_finalized(block_id).unwrap();
// Verify status is now Finalized
let finalized_block = node_store.get_block_at_id(block_id).unwrap();
assert!(matches!(
finalized_block.bedrock_status,
common::block::BedrockStatus::Finalized
));
}
}

View File

@ -1,11 +1,12 @@
use std::{fmt::Display, path::Path, time::Instant};
use anyhow::{Result, anyhow};
use anyhow::{Context as _, Result, anyhow};
use bedrock_client::SignedMantleTx;
#[cfg(feature = "testnet")]
use common::PINATA_BASE58;
use common::{
HashType,
block::{BedrockStatus, Block, HashableBlockData, MantleMsgId},
block::{BedrockStatus, Block, HashableBlockData},
transaction::NSSATransaction,
};
use config::SequencerConfig;
@ -15,7 +16,7 @@ use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize};
use crate::{
block_settlement_client::{BlockSettlementClient, BlockSettlementClientTrait},
block_settlement_client::{BlockSettlementClient, BlockSettlementClientTrait, MsgId},
block_store::SequencerStore,
indexer_client::{IndexerClient, IndexerClientTrait},
};
@ -38,7 +39,6 @@ pub struct SequencerCore<
chain_height: u64,
block_settlement_client: BC,
indexer_client: IC,
last_bedrock_msg_id: MantleMsgId,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -75,14 +75,32 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
let genesis_parent_msg_id = [0; 32];
let genesis_block = hashable_data.into_pending_block(&signing_key, genesis_parent_msg_id);
let bedrock_signing_key =
load_or_create_signing_key(&config.home.join("bedrock_signing_key"))
.expect("Failed to load or create bedrock signing key");
let block_settlement_client = BC::new(&config.bedrock_config, bedrock_signing_key)
.expect("Failed to initialize Block Settlement Client");
let indexer_client = IC::new(&config.indexer_rpc_url)
.await
.expect("Failed to create Indexer Client");
let (_tx, genesis_msg_id) = block_settlement_client
.create_inscribe_tx(&genesis_block)
.expect("Failed to create inscribe tx for genesis block");
// Sequencer should panic if unable to open db,
// as fixing this issue may require actions non-native to program scope
let store = SequencerStore::open_db_with_genesis(
&config.home.join("rocksdb"),
Some(&genesis_block),
Some((&genesis_block, genesis_msg_id.into())),
signing_key,
)
.unwrap();
let latest_block_meta = store
.latest_block_meta()
.expect("Failed to read latest block meta from store");
#[cfg_attr(not(feature = "testnet"), allow(unused_mut))]
let mut state = match store.get_nssa_state() {
@ -123,25 +141,15 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
state.add_pinata_program(PINATA_BASE58.parse().unwrap());
let (mempool, mempool_handle) = MemPool::new(config.mempool_max_size);
let bedrock_signing_key =
load_or_create_signing_key(&config.home.join("bedrock_signing_key"))
.expect("Failed to load or create signing key");
let block_settlement_client = BC::new(&config.bedrock_config, bedrock_signing_key)
.expect("Failed to initialize Block Settlement Client");
let indexer_client = IC::new(&config.indexer_rpc_url)
.await
.expect("Failed to create Indexer Client");
let sequencer_core = Self {
state,
store,
mempool,
chain_height: config.genesis_id,
chain_height: latest_block_meta.id,
sequencer_config: config,
block_settlement_client,
indexer_client,
last_bedrock_msg_id: genesis_parent_msg_id,
};
(sequencer_core, mempool_handle)
@ -165,29 +173,18 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
Ok(tx)
}
pub async fn produce_new_block_and_post_to_settlement_layer(&mut self) -> Result<u64> {
{
let block = self.produce_new_block_with_mempool_transactions()?;
match self
.block_settlement_client
.submit_block_to_bedrock(&block)
.await
{
Ok(msg_id) => {
self.last_bedrock_msg_id = msg_id.into();
info!("Posted block data to Bedrock, msg_id: {msg_id:?}");
}
Err(err) => {
error!("Failed to post block data to Bedrock with error: {err:#}");
}
}
}
pub async fn produce_new_block(&mut self) -> Result<u64> {
let (_tx, _msg_id) = self
.produce_new_block_with_mempool_transactions()
.context("Failed to produce new block with mempool transactions")?;
Ok(self.chain_height)
}
/// Produces new block from transactions in mempool
pub fn produce_new_block_with_mempool_transactions(&mut self) -> Result<Block> {
/// Produces new block from transactions in mempool and packs it into a SignedMantleTx.
pub fn produce_new_block_with_mempool_transactions(
&mut self,
) -> Result<(SignedMantleTx, MsgId)> {
let now = Instant::now();
let new_block_height = self.chain_height + 1;
@ -214,41 +211,44 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
}
}
let prev_block_hash = self.store.get_block_at_id(self.chain_height)?.header.hash;
let latest_block_meta = self
.store
.latest_block_meta()
.context("Failed to get latest block meta from store")?;
let curr_time = chrono::Utc::now().timestamp_millis() as u64;
let hashable_data = HashableBlockData {
block_id: new_block_height,
transactions: valid_transactions,
prev_block_hash,
prev_block_hash: latest_block_meta.hash,
timestamp: curr_time,
};
let block = hashable_data
.clone()
.into_pending_block(self.store.signing_key(), self.last_bedrock_msg_id);
.into_pending_block(self.store.signing_key(), latest_block_meta.msg_id);
self.store.update(&block, &self.state)?;
let (tx, msg_id) = self
.block_settlement_client
.create_inscribe_tx(&block)
.with_context(|| {
format!(
"Failed to create inscribe transaction for block with id {}",
block.header.block_id
)
})?;
self.store.update(&block, msg_id.into(), &self.state)?;
self.chain_height = new_block_height;
// TODO: Consider switching to `tracing` crate to have more structured and consistent logs
// e.g.
//
// ```
// info!(
// num_txs = num_txs_in_block,
// time = now.elapsed(),
// "Created block"
// );
// ```
log::info!(
"Created block with {} transactions in {} seconds",
hashable_data.transactions.len(),
now.elapsed().as_secs()
);
Ok(block)
Ok((tx, msg_id))
}
pub fn state(&self) -> &nssa::V02State {
@ -282,8 +282,10 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
"Clearing pending blocks up to id: {}",
last_finalized_block_id
);
// TODO: Delete blocks instead of marking them as finalized.
// Current approach is used because we still have `GetBlockDataRequest`.
(first_pending_block_id..=last_finalized_block_id)
.try_for_each(|id| self.store.delete_block_at_id(id))
.try_for_each(|id| self.store.mark_block_as_finalized(id))
} else {
Ok(())
}
@ -344,6 +346,10 @@ fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
} else {
let mut key_bytes = [0u8; ED25519_SECRET_KEY_SIZE];
rand::RngCore::fill_bytes(&mut rand::thread_rng(), &mut key_bytes);
// Create parent directory if it doesn't exist
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(path, key_bytes)?;
Ok(Ed25519Key::from_bytes(&key_bytes))
}
@ -648,9 +654,9 @@ mod tests {
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
let block = sequencer.produce_new_block_with_mempool_transactions();
assert!(block.is_ok());
assert_eq!(block.unwrap().header.block_id, genesis_height + 1);
let result = sequencer.produce_new_block_with_mempool_transactions();
assert!(result.is_ok());
assert_eq!(sequencer.chain_height, genesis_height + 1);
}
#[tokio::test]
@ -673,12 +679,13 @@ mod tests {
mempool_handle.push(tx_replay).await.unwrap();
// Create block
let current_height = sequencer
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap()
.header
.block_id;
let block = sequencer.store.get_block_at_id(current_height).unwrap();
.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
.unwrap();
// Only one should be included in the block
assert_eq!(block.body.transactions, vec![tx.clone()]);
@ -699,22 +706,24 @@ mod tests {
// The transaction should be included the first time
mempool_handle.push(tx.clone()).await.unwrap();
let current_height = sequencer
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap()
.header
.block_id;
let block = sequencer.store.get_block_at_id(current_height).unwrap();
.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
.unwrap();
assert_eq!(block.body.transactions, vec![tx.clone()]);
// Add same transaction should fail
mempool_handle.push(tx.clone()).await.unwrap();
let current_height = sequencer
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap()
.header
.block_id;
let block = sequencer.store.get_block_at_id(current_height).unwrap();
.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
.unwrap();
assert!(block.body.transactions.is_empty());
}
@ -742,12 +751,13 @@ mod tests {
);
mempool_handle.push(tx.clone()).await.unwrap();
let current_height = sequencer
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap()
.header
.block_id;
let block = sequencer.store.get_block_at_id(current_height).unwrap();
.unwrap();
let block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
.unwrap();
assert_eq!(block.body.transactions, vec![tx.clone()]);
}
@ -808,4 +818,126 @@ mod tests {
assert_eq!(sequencer.get_pending_blocks().unwrap().len(), 1);
}
#[tokio::test]
async fn test_produce_block_with_correct_prev_meta_after_restart() {
let config = setup_sequencer_config();
let acc1_account_id = config.initial_accounts[0].account_id;
let acc2_account_id = config.initial_accounts[1].account_id;
// Step 1: Create initial database with some block metadata
let expected_prev_meta = {
let (mut sequencer, mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config.clone()).await;
let signing_key = PrivateKey::try_new([1; 32]).unwrap();
// Add a transaction and produce a block to set up block metadata
let tx = common::test_utils::create_transaction_native_token_transfer(
acc1_account_id,
0,
acc2_account_id,
100,
signing_key,
);
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
// Get the metadata of the last block produced
sequencer.store.latest_block_meta().unwrap()
};
// Step 2: Restart sequencer from the same storage
let (mut sequencer, mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config.clone()).await;
// Step 3: Submit a new transaction
let signing_key = PrivateKey::try_new([1; 32]).unwrap();
let tx = common::test_utils::create_transaction_native_token_transfer(
acc1_account_id,
1, // Next nonce
acc2_account_id,
50,
signing_key,
);
mempool_handle.push(tx.clone()).await.unwrap();
// Step 4: Produce new block
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
// Step 5: Verify the new block has correct previous block metadata
let new_block = sequencer
.store
.get_block_at_id(sequencer.chain_height)
.unwrap();
assert_eq!(
new_block.header.prev_block_hash, expected_prev_meta.hash,
"New block's prev_block_hash should match the stored metadata hash"
);
assert_eq!(
new_block.bedrock_parent_id, expected_prev_meta.msg_id,
"New block's bedrock_parent_id should match the stored metadata msg_id"
);
assert_eq!(
new_block.body.transactions,
vec![tx],
"New block should contain the submitted transaction"
);
}
#[tokio::test]
async fn test_start_from_config_uses_db_height_not_config_genesis() {
let mut config = setup_sequencer_config();
let original_genesis_id = config.genesis_id;
// Step 1: Create initial database and produce some blocks
let expected_chain_height = {
let (mut sequencer, mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config.clone()).await;
// Verify we start with the genesis_id from config
assert_eq!(sequencer.chain_height, original_genesis_id);
// Produce multiple blocks to advance chain height
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
sequencer
.produce_new_block_with_mempool_transactions()
.unwrap();
// Return the current chain height (should be genesis_id + 2)
sequencer.chain_height
};
// Step 2: Modify the config to have a DIFFERENT genesis_id
let different_genesis_id = original_genesis_id + 100;
config.genesis_id = different_genesis_id;
// Step 3: Restart sequencer with the modified config (different genesis_id)
let (sequencer, _mempool_handle) =
SequencerCoreWithMockClients::start_from_config(config.clone()).await;
// Step 4: Verify chain_height comes from database, NOT from the new config.genesis_id
assert_eq!(
sequencer.chain_height, expected_chain_height,
"Chain height should be loaded from database metadata, not config.genesis_id"
);
assert_ne!(
sequencer.chain_height, different_genesis_id,
"Chain height should NOT match the modified config.genesis_id"
);
}
}

View File

@ -1,6 +1,6 @@
use anyhow::Result;
use common::block::Block;
use logos_blockchain_core::mantle::ops::channel::{ChannelId, MsgId};
use anyhow::{Result, anyhow};
use bedrock_client::SignedMantleTx;
use logos_blockchain_core::mantle::ops::channel::ChannelId;
use logos_blockchain_key_management_system_service::keys::Ed25519Key;
use url::Url;
@ -34,8 +34,35 @@ impl BlockSettlementClientTrait for MockBlockSettlementClient {
&self.bedrock_signing_key
}
async fn submit_block_to_bedrock(&self, block: &Block) -> Result<MsgId> {
self.create_inscribe_tx(block).map(|(_, msg_id)| msg_id)
async fn submit_inscribe_tx_to_bedrock(&self, _tx: SignedMantleTx) -> Result<()> {
Ok(())
}
}
#[derive(Clone)]
pub struct MockBlockSettlementClientWithError {
bedrock_channel_id: ChannelId,
bedrock_signing_key: Ed25519Key,
}
impl BlockSettlementClientTrait for MockBlockSettlementClientWithError {
fn new(config: &BedrockConfig, bedrock_signing_key: Ed25519Key) -> Result<Self> {
Ok(Self {
bedrock_channel_id: config.channel_id,
bedrock_signing_key,
})
}
fn bedrock_channel_id(&self) -> ChannelId {
self.bedrock_channel_id
}
fn bedrock_signing_key(&self) -> &Ed25519Key {
&self.bedrock_signing_key
}
async fn submit_inscribe_tx_to_bedrock(&self, _tx: SignedMantleTx) -> Result<()> {
Err(anyhow!("Mock error"))
}
}

View File

@ -5,8 +5,8 @@
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"mempool_max_size": 1000,
"block_create_timeout_millis": 5000,
"retry_pending_blocks_timeout_millis": 7000,
"block_create_timeout_millis": 12000,
"retry_pending_blocks_timeout_millis": 6000,
"port": 3040,
"bedrock_config": {
"backoff": {
@ -14,7 +14,7 @@
"max_retries": 5
},
"channel_id": "0101010101010101010101010101010101010101010101010101010101010101",
"node_url": "http://localhost:18080"
"node_url": "http://localhost:8080"
},
"indexer_rpc_url": "ws://localhost:8779",
"initial_accounts": [

View File

@ -147,9 +147,7 @@ async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration)
let id = {
let mut state = seq_core.lock().await;
state
.produce_new_block_and_post_to_settlement_layer()
.await?
state.produce_new_block().await?
};
info!("Block with id {id} created");
@ -174,12 +172,27 @@ async fn retry_pending_blocks_loop(
(pending_blocks, client)
};
info!("Resubmitting {} pending blocks", pending_blocks.len());
for block in &pending_blocks {
if let Err(e) = block_settlement_client.submit_block_to_bedrock(block).await {
if let Some(block) = pending_blocks
.iter()
.min_by_key(|block| block.header.block_id)
{
info!(
"Resubmitting pending block with id {}",
block.header.block_id
);
// TODO: We could cache the inscribe tx for each pending block to avoid re-creating it
// on every retry.
let (tx, _msg_id) = block_settlement_client
.create_inscribe_tx(block)
.context("Failed to create inscribe tx for pending block")?;
if let Err(e) = block_settlement_client
.submit_inscribe_tx_to_bedrock(tx)
.await
{
warn!(
"Failed to resubmit block with id {} with error {}",
block.header.block_id, e
"Failed to resubmit block with id {} with error {e:#}",
block.header.block_id
);
}
}

View File

@ -1,16 +1,18 @@
#[derive(thiserror::Error, Debug)]
pub enum DbError {
#[error("RocksDb error")]
#[error("RocksDb error: {}", additional_info.as_deref().unwrap_or("No additional info"))]
RocksDbError {
#[source]
error: rocksdb::Error,
additional_info: Option<String>,
},
#[error("Serialization error")]
#[error("Serialization error: {}", additional_info.as_deref().unwrap_or("No additional info"))]
SerializationError {
#[source]
error: borsh::io::Error,
additional_info: Option<String>,
},
#[error("Logic Error")]
#[error("Logic Error: {additional_info}")]
DbInteractionError { additional_info: String },
}

View File

@ -1,6 +1,6 @@
use std::{path::Path, sync::Arc};
use common::block::Block;
use common::block::{BedrockStatus, Block, BlockMeta, MantleMsgId};
use error::DbError;
use nssa::V02State;
use rocksdb::{
@ -29,6 +29,8 @@ pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
/// Key base for storing metainformation about the last finalized block on Bedrock
pub const DB_META_LAST_FINALIZED_BLOCK_ID: &str = "last_finalized_block_id";
/// Key base for storing metainformation about the latest block meta
pub const DB_META_LATEST_BLOCK_META_KEY: &str = "latest_block_meta";
/// Key base for storing the NSSA state
pub const DB_NSSA_STATE_KEY: &str = "nssa_state";
@ -47,7 +49,10 @@ pub struct RocksDBIO {
}
impl RocksDBIO {
pub fn open_or_create(path: &Path, start_block: Option<&Block>) -> DbResult<Self> {
pub fn open_or_create(
path: &Path,
start_block: Option<(&Block, MantleMsgId)>,
) -> DbResult<Self> {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
// ToDo: Add more column families for different data
@ -73,12 +78,17 @@ impl RocksDBIO {
if is_start_set {
Ok(dbio)
} else if let Some(block) = start_block {
} else if let Some((block, msg_id)) = start_block {
let block_id = block.header.block_id;
dbio.put_meta_first_block_in_db(block)?;
dbio.put_meta_first_block_in_db(block, msg_id)?;
dbio.put_meta_is_first_block_set()?;
dbio.put_meta_last_block_in_db(block_id)?;
dbio.put_meta_last_finalized_block_id(None)?;
dbio.put_meta_latest_block_meta(&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
})?;
Ok(dbio)
} else {
@ -208,7 +218,7 @@ impl RocksDBIO {
Ok(())
}
pub fn put_meta_first_block_in_db(&self, block: &Block) -> DbResult<()> {
pub fn put_meta_first_block_in_db(&self, block: &Block, msg_id: MantleMsgId) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
@ -229,7 +239,7 @@ impl RocksDBIO {
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
let mut batch = WriteBatch::default();
self.put_block(block, true, &mut batch)?;
self.put_block(block, msg_id, true, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
@ -262,6 +272,30 @@ impl RocksDBIO {
Ok(())
}
fn put_meta_last_block_in_db_batch(
&self,
block_id: u64,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LAST_BLOCK_IN_DB_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LAST_BLOCK_IN_DB_KEY".to_string()),
)
})?,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize last block id".to_string()),
)
})?,
);
Ok(())
}
pub fn put_meta_last_finalized_block_id(&self, block_id: Option<u64>) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
@ -301,14 +335,103 @@ impl RocksDBIO {
Ok(())
}
pub fn put_block(&self, block: &Block, first: bool, batch: &mut WriteBatch) -> DbResult<()> {
fn put_meta_latest_block_meta(&self, block_meta: &BlockMeta) -> DbResult<()> {
let cf_meta = self.meta_column();
self.db
.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
Ok(())
}
fn put_meta_latest_block_meta_batch(
&self,
block_meta: &BlockMeta,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_meta = self.meta_column();
batch.put_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
borsh::to_vec(&block_meta).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize latest block meta".to_string()),
)
})?,
);
Ok(())
}
pub fn latest_block_meta(&self) -> DbResult<BlockMeta> {
let cf_meta = self.meta_column();
let res = self
.db
.get_cf(
&cf_meta,
borsh::to_vec(&DB_META_LATEST_BLOCK_META_KEY).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize DB_META_LATEST_BLOCK_META_KEY".to_string()),
)
})?,
)
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
if let Some(data) = res {
Ok(borsh::from_slice::<BlockMeta>(&data).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to deserialize latest block meta".to_string()),
)
})?)
} else {
Err(DbError::db_interaction_error(
"Latest block meta not found".to_string(),
))
}
}
pub fn put_block(
&self,
block: &Block,
msg_id: MantleMsgId,
first: bool,
batch: &mut WriteBatch,
) -> DbResult<()> {
let cf_block = self.block_column();
if !first {
let last_curr_block = self.get_meta_last_block_in_db()?;
if block.header.block_id > last_curr_block {
self.put_meta_last_block_in_db(block.header.block_id)?;
self.put_meta_last_block_in_db_batch(block.header.block_id, batch)?;
self.put_meta_latest_block_meta_batch(
&BlockMeta {
id: block.header.block_id,
hash: block.header.hash,
msg_id,
},
batch,
)?;
}
}
@ -406,6 +529,37 @@ impl RocksDBIO {
Ok(())
}
pub fn mark_block_as_finalized(&self, block_id: u64) -> DbResult<()> {
let mut block = self.get_block(block_id)?;
block.bedrock_status = BedrockStatus::Finalized;
let cf_block = self.block_column();
self.db
.put_cf(
&cf_block,
borsh::to_vec(&block_id).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block id".to_string()),
)
})?,
borsh::to_vec(&block).map_err(|err| {
DbError::borsh_cast_message(
err,
Some("Failed to serialize block data".to_string()),
)
})?,
)
.map_err(|rerr| {
DbError::rocksdb_cast_message(
rerr,
Some(format!("Failed to mark block {block_id} as finalized")),
)
})?;
Ok(())
}
pub fn get_all_blocks(&self) -> impl Iterator<Item = DbResult<Block>> {
let cf_block = self.block_column();
self.db
@ -427,10 +581,15 @@ impl RocksDBIO {
})
}
pub fn atomic_update(&self, block: &Block, state: &V02State) -> DbResult<()> {
pub fn atomic_update(
&self,
block: &Block,
msg_id: MantleMsgId,
state: &V02State,
) -> DbResult<()> {
let block_id = block.header.block_id;
let mut batch = WriteBatch::default();
self.put_block(block, false, &mut batch)?;
self.put_block(block, msg_id, false, &mut batch)?;
self.put_nssa_state_in_db(state, &mut batch)?;
self.db.write(batch).map_err(|rerr| {
DbError::rocksdb_cast_message(

View File

@ -22,7 +22,6 @@ clap.workspace = true
base64.workspace = true
bytemuck.workspace = true
borsh.workspace = true
base58.workspace = true
hex.workspace = true
rand.workspace = true
itertools.workspace = true

View File

@ -1,5 +1,4 @@
use anyhow::Result;
use base58::ToBase58;
use clap::Subcommand;
use itertools::Itertools as _;
use key_protocol::key_management::key_tree::chain_index::ChainIndex;
@ -104,8 +103,7 @@ impl WalletSubcommand for NewSubcommand {
.unwrap();
println!(
"Generated new account with account_id Private/{} at path {chain_index}",
account_id.to_bytes().to_base58()
"Generated new account with account_id Private/{account_id} at path {chain_index}",
);
println!("With npk {}", hex::encode(key.nullifer_public_key.0));
println!(